Indentation cleanups - WHITESPACE ONLY

Change-Id: I58d5d4da6f703d07a2c31ecb109497b46be0d5d8
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index b52bafc..1c0802a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -47,13 +47,13 @@
  * FlowPusher is a implementation of FlowPusherService.
  * FlowPusher assigns one message queue instance for each one switch.
  * Number of message processing threads is configurable by constructor, and
- * one thread can handle multiple message queues. Each queue will be assigned to 
+ * one thread can handle multiple message queues. Each queue will be assigned to
  * a thread according to hash function defined by getHash().
  * Each processing thread reads messages from queues and sends it to switches
  * in round-robin. Processing thread also calculates rate of sending to suppress
  * excessive message sending.
- * @author Naoki Shiota
  *
+ * @author Naoki Shiota
  */
 public class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
@@ -63,995 +63,1006 @@
     // The local copy should go away!
     //
     protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
-    protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250;	// ms
-    
+    protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250;    // ms
+
     // Number of messages sent to switch at once
     protected static final int MAX_MESSAGE_SEND = 100;
 
-	private static class SwitchQueueEntry {
-		OFMessage msg;
-		
-		public SwitchQueueEntry(OFMessage msg) {
-			this.msg = msg;
-		}
-		
-		public OFMessage getOFMessage() {
-			return msg;
-		}
-	}
-	
-	/**
-	 * SwitchQueue represents message queue attached to a switch.
-	 * This consists of queue itself and variables used for limiting sending rate.
-	 * @author Naoki Shiota
-	 *
-	 */
-	private class SwitchQueue {
-		List<Queue<SwitchQueueEntry>> raw_queues;
-		QueueState state;
-		
-		// Max rate of sending message (bytes/ms). 0 implies no limitation.
-		long max_rate = 0;	// 0 indicates no limitation
-		long last_sent_time = 0;
-		long last_sent_size = 0;
-		
-		// "To be deleted" flag
-		boolean toBeDeleted = false;
-		
-		SwitchQueue() {
-			raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
-					 MsgPriority.values().length);
-			for (int i = 0; i < MsgPriority.values().length; ++i) {
-				raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
-			}
-			
-			state = QueueState.READY;
-		}
-		
-		/**
-		 * Check if sending rate is within the rate
-		 * @param current Current time
-		 * @return true if within the rate
-		 */
-		boolean isSendable(long current) {
-			if (max_rate == 0) {
-				// no limitation
-				return true;
-			}
-			
-			if (current == last_sent_time) {
-				return false;
-			}
-			
-			// Check if sufficient time (from aspect of rate) elapsed or not.
-			long rate = last_sent_size / (current - last_sent_time);
-			return (rate < max_rate);
-		}
-		
-		/**
-		 * Log time and size of last sent data.
-		 * @param current Time to be sent.
-		 * @param size Size of sent data (in bytes).
-		 */
-		void logSentData(long current, long size) {
-			last_sent_time = current;
-			last_sent_size = size;
-		}
-		
-		boolean add(SwitchQueueEntry entry, MsgPriority priority) {
-			Queue<SwitchQueueEntry> queue = getQueue(priority);
-			if (queue == null) {
-				log.error("Unexpected priority : ", priority);
-				return false;
-			}
-			return queue.add(entry);
-		}
-		
-		/**
-		 * Poll single appropriate entry object according to QueueState.
-		 * @return Entry object.
-		 */
-		SwitchQueueEntry poll() {
-			switch (state) {
-			case READY:
-			{
-				for (int i = 0; i < raw_queues.size(); ++i) {
-					SwitchQueueEntry entry = raw_queues.get(i).poll();
-					if (entry != null) {
-						return entry;
-					}
-				}
-				
-				return null;
-			}
-			case SUSPENDED:
-			{
-				// Only polling from high priority queue
-				SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
-				return entry;
-			}
-			default:
-				log.error("Unexpected QueueState : ", state);
-				return null;
-			}
-		}
-		
-		/**
-		 * Check if this object has any messages in the queues to be sent
-		 * @return True if there are some messages to be sent.
-		 */
-		boolean hasMessageToSend() {
-			switch (state) {
-			case READY:
-				for (Queue<SwitchQueueEntry> queue : raw_queues) {
-					if (! queue.isEmpty()) {
-						return true;
-					}
-				}
-			break;
-			case SUSPENDED:
-				// Only checking high priority queue
-				return (! getQueue(MsgPriority.HIGH).isEmpty());
-			default:
-				log.error("Unexpected QueueState : ", state);
-				return false;
-			}
-			
-			return false;
-		}
-		
-		Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
-			return raw_queues.get(priority.ordinal());
-		}
-	}
-	
-	/**
-	 * BarrierInfo holds information to specify barrier message sent to switch.
-	 * @author Naoki
-	 */
-	private static class BarrierInfo {
-		final long dpid;
-		final int xid;
-		
-		static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
-			return new BarrierInfo(sw.getId(), req.getXid());
-		}
-		
-		static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
-			return new BarrierInfo(sw.getId(), rpy.getXid());
-		}
-		
-		private BarrierInfo(long dpid, int xid) {
-			this.dpid = dpid;
-			this.xid = xid;
-		}
+    private static class SwitchQueueEntry {
+        OFMessage msg;
 
-		// Auto generated code by Eclipse
-		@Override
-		public int hashCode() {
-			final int prime = 31;
-			int result = 1;
-			result = prime * result + (int) (dpid ^ (dpid >>> 32));
-			result = prime * result + xid;
-			return result;
-		}
+        public SwitchQueueEntry(OFMessage msg) {
+            this.msg = msg;
+        }
 
-		@Override
-		public boolean equals(Object obj) {
-			if (this == obj)
-				return true;
-			if (obj == null)
-				return false;
-			if (getClass() != obj.getClass())
-				return false;
-			
-			BarrierInfo other = (BarrierInfo) obj;
-			return (this.dpid == other.dpid) && (this.xid == other.xid);
-		}
-		
-		
-	}
-	
-	private OFMessageDamper messageDamper = null;
-	private IThreadPoolService threadPool = null;
+        public OFMessage getOFMessage() {
+            return msg;
+        }
+    }
 
-	private FloodlightContext context = null;
-	private BasicFactory factory = null;
-	
-	// Map of threads versus dpid
-	private Map<Long, FlowPusherThread> threadMap = null;
-	// Map from (DPID and transaction ID) to Future objects.
-	private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
-		= new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
-	
-	private int number_thread;
-	
-	/**
-	 * Main thread that reads messages from queues and sends them to switches.
-	 * @author Naoki Shiota
-	 *
-	 */
-	private class FlowPusherThread extends Thread {
-		private Map<IOFSwitch,SwitchQueue> assignedQueues
-			= new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
-		
-		final Lock queuingLock = new ReentrantLock();
-		final Condition messagePushed = queuingLock.newCondition();
-		
-		@Override
-		public void run() {
-			this.setName("FlowPusherThread " + this.getId() );
-			while (true) {
-				while (! queuesHasMessageToSend()) {
-					queuingLock.lock();
-					
-					try {
-						// wait for message pushed to queue
-						messagePushed.await();
-					} catch (InterruptedException e) {
-						// Interrupted to be shut down (not an error)
-						log.debug("FlowPusherThread is interrupted");
-						return;
-					} finally {
-						queuingLock.unlock();
-					}
-				}
-				
-				// for safety of concurrent access, copy set of key objects
-				Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
-				for (IOFSwitch sw : assignedQueues.keySet()) {
-					keys.add(sw);
-				}
-				
-				for (IOFSwitch sw : keys) {
-					SwitchQueue queue = assignedQueues.get(sw);
+    /**
+     * SwitchQueue represents message queue attached to a switch.
+     * This consists of queue itself and variables used for limiting sending rate.
+     *
+     * @author Naoki Shiota
+     */
+    private class SwitchQueue {
+        List<Queue<SwitchQueueEntry>> raw_queues;
+        QueueState state;
 
-					if (sw == null || queue == null) {
-						continue;
-					}
-					
-					synchronized (queue) {
-						processQueue(sw, queue, MAX_MESSAGE_SEND);
-						if (queue.toBeDeleted && ! queue.hasMessageToSend()) {
-							// remove queue if flagged to be.
-							assignedQueues.remove(sw);
-						}
-					}
-				}
-			}
-		}
-		
-		/**
-		 * Read messages from queue and send them to the switch.
-		 * If number of messages excess the limit, stop sending messages.
-		 * @param sw Switch to which messages will be sent.
-		 * @param queue Queue of messages.
-		 * @param max_msg Limitation of number of messages to be sent. If set to 0,
-		 *                all messages in queue will be sent.
-		 */
-		private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
-			// check sending rate and determine it to be sent or not
-			long current_time = System.currentTimeMillis();
-			long size = 0;
-			
-			if (queue.isSendable(current_time)) {
-				int i = 0;
-				while (queue.hasMessageToSend()) {
-					// Number of messages excess the limit
-					if (0 < max_msg && max_msg <= i) {
-						break;
-					}
-					++i;
-					
-					SwitchQueueEntry queueEntry;
-					synchronized (queue) {
-						queueEntry = queue.poll();
-					}
-					
-					OFMessage msg = queueEntry.getOFMessage();
-					try {
-						messageDamper.write(sw, msg, context);
-						if (log.isTraceEnabled()) {
-							log.trace("Pusher sends message : {}", msg);
-						}
-						size += msg.getLength();
-					} catch (IOException e) {
-						e.printStackTrace();
-						log.error("Exception in sending message ({}) : {}", msg, e);
-					}
-				}
-				
-				sw.flush();
-				queue.logSentData(current_time, size);
-			}
-		}
-		
-		private boolean queuesHasMessageToSend() {
-			for (SwitchQueue queue : assignedQueues.values()) {
-				if (queue.hasMessageToSend()) {
-					return true;
-				}
-			}
+        // Max rate of sending message (bytes/ms). 0 implies no limitation.
+        long max_rate = 0;    // 0 indicates no limitation
+        long last_sent_time = 0;
+        long last_sent_size = 0;
 
-			return false;
-		}
-		
-		private void notifyMessagePushed() {
-			queuingLock.lock();
-			try {
-				messagePushed.signal();
-			} finally {
-				queuingLock.unlock();
-			}
-		}
-	}
-	
-	/**
-	 * Initialize object with one thread.
-	 */
-	public FlowPusher() {
-		number_thread = DEFAULT_NUMBER_THREAD;
-	}
-	
-	/**
-	 * Initialize object with threads of given number.
-	 * @param number_thread Number of threads to handle messages.
-	 */
-	public FlowPusher(int number_thread) {
-		if (number_thread > 0) {
-			this.number_thread = number_thread;
-		} else {
-			this.number_thread = DEFAULT_NUMBER_THREAD;
-		}
-	}
-	
-	/**
-	 * Set parameters needed for sending messages.
-	 * @param context FloodlightContext used for sending messages.
-	 *        If null, FlowPusher uses default context.
-	 * @param modContext FloodlightModuleContext used for acquiring
-	 *        ThreadPoolService and registering MessageListener.
-	 * @param factory Factory object to create OFMessage objects.
-	 * @param damper Message damper used for sending messages.
-	 *        If null, FlowPusher creates its own damper object.
-	 */
-	public void init(FloodlightContext context,
-			FloodlightModuleContext modContext,
-			BasicFactory factory,
-			OFMessageDamper damper) {
-		this.context = context;
-		this.factory = factory;
-		this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
-		IFloodlightProviderService flservice
-			= modContext.getServiceImpl(IFloodlightProviderService.class);
-		flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
-		
-		if (damper != null) {
-			messageDamper = damper;
-		} else {
-			// use default values
-			messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
-				    EnumSet.of(OFType.FLOW_MOD),
-				    OFMESSAGE_DAMPER_TIMEOUT);
-		}
-	}
-	
-	/**
-	 * Begin processing queue.
-	 */
-	public void start() {
-		if (factory == null) {
-			log.error("FlowPusher not yet initialized.");
-			return;
-		}
-		
-		threadMap = new HashMap<Long,FlowPusherThread>();
-		for (long i = 0; i < number_thread; ++i) {
-			FlowPusherThread thread = new FlowPusherThread();
-			
-			threadMap.put(i, thread);
-			thread.start();
-		}
-	}
-	
-	@Override
-	public boolean suspend(IOFSwitch sw) {
-		SwitchQueue queue = getQueue(sw);
-		
-		if (queue == null) {
-			// create queue in case suspend is called before first message addition
-			queue = createQueueImpl(sw);
-		}
-		
-		synchronized (queue) {
-			if (queue.state == QueueState.READY) {
-				queue.state = QueueState.SUSPENDED;
-				return true;
-			}
-			return false;
-		}
-	}
+        // "To be deleted" flag
+        boolean toBeDeleted = false;
 
-	@Override
-	public boolean resume(IOFSwitch sw) {
-		SwitchQueue queue = getQueue(sw);
-		
-		if (queue == null) {
-			log.error("No queue is attached to DPID : {}", sw.getId());
-			return false;
-		}
-		
-		synchronized (queue) {
-			if (queue.state == QueueState.SUSPENDED) {
-				queue.state = QueueState.READY;
-				
-				// Free the latch if queue has any messages
-				FlowPusherThread thread = getProcessingThread(sw);
-				if (queue.hasMessageToSend()) {
-					thread.notifyMessagePushed();
-				}
-				return true;
-			}
-			return false;
-		}
-	}
-	
-	@Override
-	public QueueState getState(IOFSwitch sw) {
-		SwitchQueue queue = getQueue(sw);
-		
-		if (queue == null) {
-			return QueueState.UNKNOWN;
-		}
-		
-		return queue.state;
-	}
+        SwitchQueue() {
+            raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
+                    MsgPriority.values().length);
+            for (int i = 0; i < MsgPriority.values().length; ++i) {
+                raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
+            }
 
-	/**
-	 * Stop processing queue and exit thread.
-	 */
-	public void stop() {
-		if (threadMap == null) {
-			return;
-		}
-		
-		for (FlowPusherThread t : threadMap.values()) {
-			t.interrupt();
-		}
-	}
-	
-	@Override
-	public void setRate(IOFSwitch sw, long rate) {
-		SwitchQueue queue = getQueue(sw);
-		if (queue == null) {
-			queue = createQueueImpl(sw);
-		}
-		
-		if (rate > 0) {
-			log.debug("rate for {} is set to {}", sw.getId(), rate);
-			synchronized (queue) {
-				queue.max_rate = rate;
-			}
-		}
-	}
+            state = QueueState.READY;
+        }
 
-	@Override
-	public boolean createQueue(IOFSwitch sw) {
-		SwitchQueue queue = createQueueImpl(sw);
-		
-		return (queue != null);
-	}
-	
-	protected SwitchQueue createQueueImpl(IOFSwitch sw) {
-		SwitchQueue queue = getQueue(sw);
-		if (queue != null) {
-			return queue;
-		}
-		
-		FlowPusherThread proc = getProcessingThread(sw);
-		queue = new SwitchQueue();
-		queue.state = QueueState.READY;
-		proc.assignedQueues.put(sw, queue);
-		
-		return queue;
-	}
+        /**
+         * Check if sending rate is within the rate
+         *
+         * @param current Current time
+         * @return true if within the rate
+         */
+        boolean isSendable(long current) {
+            if (max_rate == 0) {
+                // no limitation
+                return true;
+            }
 
-	@Override
-	public boolean deleteQueue(IOFSwitch sw) {
-		return deleteQueue(sw, false);
-	}
-	
-	@Override
-	public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
-		FlowPusherThread proc = getProcessingThread(sw);
-		
-		if (forceStop) {
-			SwitchQueue queue = proc.assignedQueues.remove(sw);
-			if (queue == null) {
-				return false;
-			}
-			return true;
-		} else {
-			SwitchQueue queue = getQueue(sw);
-			if (queue == null) {
-				return false;
-			}
-			synchronized (queue) {
-				queue.toBeDeleted = true;
-			}
-			return true;
-		}
-	}
-	
-	@Override
-	public boolean add(IOFSwitch sw, OFMessage msg) {
-		return add(sw, msg, MsgPriority.NORMAL);
-	}
-	
-	@Override
-	public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
-		return addMessageImpl(sw, msg, priority);
-	}
-	
-	@Override
-	public void pushFlowEntries(
-		Collection<Pair<IOFSwitch, FlowEntry>> entries) {
-		pushFlowEntries(entries, MsgPriority.NORMAL);
-	}
-	
-	@Override
-	public void pushFlowEntries(
-		Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
+            if (current == last_sent_time) {
+                return false;
+            }
 
-		for (Pair<IOFSwitch, FlowEntry> entry : entries) {
-			add(entry.first, entry.second, priority);
-		}
-	}
+            // Check if sufficient time (from aspect of rate) elapsed or not.
+            long rate = last_sent_size / (current - last_sent_time);
+            return (rate < max_rate);
+        }
 
-	@Override
-	public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
-	    pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
-	}
+        /**
+         * Log time and size of last sent data.
+         *
+         * @param current Time to be sent.
+         * @param size    Size of sent data (in bytes).
+         */
+        void logSentData(long current, long size) {
+            last_sent_time = current;
+            last_sent_size = size;
+        }
 
-	@Override
-	public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
-	    Collection<Pair<IOFSwitch, FlowEntry>> entries = 
-		new LinkedList<Pair<IOFSwitch, FlowEntry>>();
+        boolean add(SwitchQueueEntry entry, MsgPriority priority) {
+            Queue<SwitchQueueEntry> queue = getQueue(priority);
+            if (queue == null) {
+                log.error("Unexpected priority : ", priority);
+                return false;
+            }
+            return queue.add(entry);
+        }
 
-	    entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
-	    pushFlowEntries(entries, priority);
-	}
-	
-	/**
-	 * Create a message from FlowEntry and add it to the queue of the switch.
-	 * @param sw Switch to which message is pushed.
-	 * @param flowEntry FlowEntry object used for creating message.
-	 * @return true if message is successfully added to a queue.
-	 */
-	private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
-		//
-		// Create the OpenFlow Flow Modification Entry to push
-		//
-		OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
-		long cookie = flowEntry.flowEntryId().value();
+        /**
+         * Poll single appropriate entry object according to QueueState.
+         *
+         * @return Entry object.
+         */
+        SwitchQueueEntry poll() {
+            switch (state) {
+                case READY: {
+                    for (int i = 0; i < raw_queues.size(); ++i) {
+                        SwitchQueueEntry entry = raw_queues.get(i).poll();
+                        if (entry != null) {
+                            return entry;
+                        }
+                    }
 
-		short flowModCommand = OFFlowMod.OFPFC_ADD;
-		if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
-			flowModCommand = OFFlowMod.OFPFC_ADD;
-		} else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
-			flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
-		} else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
-			flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
-		} else {
-			// Unknown user state. Ignore the entry
-			log.debug(
-					"Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
-					flowEntry.flowEntryId(),
-					flowEntry.flowEntryUserState());
-			return false;
-		}
+                    return null;
+                }
+                case SUSPENDED: {
+                    // Only polling from high priority queue
+                    SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
+                    return entry;
+                }
+                default:
+                    log.error("Unexpected QueueState : ", state);
+                    return null;
+            }
+        }
 
-		//
-		// Fetch the match conditions.
-		//
-		// NOTE: The Flow matching conditions common for all Flow Entries are
-		// used ONLY if a Flow Entry does NOT have the corresponding matching
-		// condition set.
-		//
-		OFMatch match = new OFMatch();
-		match.setWildcards(OFMatch.OFPFW_ALL);
-		FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+        /**
+         * Check if this object has any messages in the queues to be sent
+         *
+         * @return True if there are some messages to be sent.
+         */
+        boolean hasMessageToSend() {
+            switch (state) {
+                case READY:
+                    for (Queue<SwitchQueueEntry> queue : raw_queues) {
+                        if (!queue.isEmpty()) {
+                            return true;
+                        }
+                    }
+                    break;
+                case SUSPENDED:
+                    // Only checking high priority queue
+                    return (!getQueue(MsgPriority.HIGH).isEmpty());
+                default:
+                    log.error("Unexpected QueueState : ", state);
+                    return false;
+            }
 
-		// Match the Incoming Port
-		Port matchInPort = flowEntryMatch.inPort();
-		if (matchInPort != null) {
-			match.setInputPort(matchInPort.value());
-			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
-		}
+            return false;
+        }
 
-		// Match the Source MAC address
-		MACAddress matchSrcMac = flowEntryMatch.srcMac();
-		if (matchSrcMac != null) {
-			match.setDataLayerSource(matchSrcMac.toString());
-			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
-		}
+        Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
+            return raw_queues.get(priority.ordinal());
+        }
+    }
 
-		// Match the Destination MAC address
-		MACAddress matchDstMac = flowEntryMatch.dstMac();
-		if (matchDstMac != null) {
-			match.setDataLayerDestination(matchDstMac.toString());
-			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
-		}
+    /**
+     * BarrierInfo holds information to specify barrier message sent to switch.
+     *
+     * @author Naoki
+     */
+    private static class BarrierInfo {
+        final long dpid;
+        final int xid;
 
-		// Match the Ethernet Frame Type
-		Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
-		if (matchEthernetFrameType != null) {
-			match.setDataLayerType(matchEthernetFrameType);
-			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
-		}
+        static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
+            return new BarrierInfo(sw.getId(), req.getXid());
+        }
 
-		// Match the VLAN ID
-		Short matchVlanId = flowEntryMatch.vlanId();
-		if (matchVlanId != null) {
-			match.setDataLayerVirtualLan(matchVlanId);
-			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
-		}
+        static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
+            return new BarrierInfo(sw.getId(), rpy.getXid());
+        }
 
-		// Match the VLAN priority
-		Byte matchVlanPriority = flowEntryMatch.vlanPriority();
-		if (matchVlanPriority != null) {
-			match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
-			match.setWildcards(match.getWildcards()
-					& ~OFMatch.OFPFW_DL_VLAN_PCP);
-		}
+        private BarrierInfo(long dpid, int xid) {
+            this.dpid = dpid;
+            this.xid = xid;
+        }
 
-		// Match the Source IPv4 Network prefix
-		IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
-		if (matchSrcIPv4Net != null) {
-			match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
-		}
+        // Auto generated code by Eclipse
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + (int) (dpid ^ (dpid >>> 32));
+            result = prime * result + xid;
+            return result;
+        }
 
-		// Natch the Destination IPv4 Network prefix
-		IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
-		if (matchDstIPv4Net != null) {
-			match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
-		}
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null)
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
 
-		// Match the IP protocol
-		Byte matchIpProto = flowEntryMatch.ipProto();
-		if (matchIpProto != null) {
-			match.setNetworkProtocol(matchIpProto);
-			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
-		}
+            BarrierInfo other = (BarrierInfo) obj;
+            return (this.dpid == other.dpid) && (this.xid == other.xid);
+        }
 
-		// Match the IP ToS (DSCP field, 6 bits)
-		Byte matchIpToS = flowEntryMatch.ipToS();
-		if (matchIpToS != null) {
-			match.setNetworkTypeOfService(matchIpToS);
-			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
-		}
 
-		// Match the Source TCP/UDP port
-		Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
-		if (matchSrcTcpUdpPort != null) {
-			match.setTransportSource(matchSrcTcpUdpPort);
-			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
-		}
+    }
 
-		// Match the Destination TCP/UDP port
-		Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
-		if (matchDstTcpUdpPort != null) {
-			match.setTransportDestination(matchDstTcpUdpPort);
-			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
-		}
+    private OFMessageDamper messageDamper = null;
+    private IThreadPoolService threadPool = null;
 
-		//
-		// Fetch the actions
-		//
-		Short actionOutputPort = null;
-		List<OFAction> openFlowActions = new ArrayList<OFAction>();
-		int actionsLen = 0;
-		FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
-		//
-		for (FlowEntryAction action : flowEntryActions.actions()) {
-			ActionOutput actionOutput = action.actionOutput();
-			ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
-			ActionSetVlanPriority actionSetVlanPriority = action
-					.actionSetVlanPriority();
-			ActionStripVlan actionStripVlan = action.actionStripVlan();
-			ActionSetEthernetAddr actionSetEthernetSrcAddr = action
-					.actionSetEthernetSrcAddr();
-			ActionSetEthernetAddr actionSetEthernetDstAddr = action
-					.actionSetEthernetDstAddr();
-			ActionSetIPv4Addr actionSetIPv4SrcAddr = action
-					.actionSetIPv4SrcAddr();
-			ActionSetIPv4Addr actionSetIPv4DstAddr = action
-					.actionSetIPv4DstAddr();
-			ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
-			ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
-					.actionSetTcpUdpSrcPort();
-			ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
-					.actionSetTcpUdpDstPort();
-			ActionEnqueue actionEnqueue = action.actionEnqueue();
+    private FloodlightContext context = null;
+    private BasicFactory factory = null;
 
-			if (actionOutput != null) {
-				actionOutputPort = actionOutput.port().value();
-				// XXX: The max length is hard-coded for now
-				OFActionOutput ofa = new OFActionOutput(actionOutput.port()
-						.value(), (short) 0xffff);
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+    // Map of threads versus dpid
+    private Map<Long, FlowPusherThread> threadMap = null;
+    // Map from (DPID and transaction ID) to Future objects.
+    private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
+            = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
 
-			if (actionSetVlanId != null) {
-				OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
-						actionSetVlanId.vlanId());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+    private int number_thread;
 
-			if (actionSetVlanPriority != null) {
-				OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
-						actionSetVlanPriority.vlanPriority());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+    /**
+     * Main thread that reads messages from queues and sends them to switches.
+     *
+     * @author Naoki Shiota
+     */
+    private class FlowPusherThread extends Thread {
+        private Map<IOFSwitch, SwitchQueue> assignedQueues
+                = new ConcurrentHashMap<IOFSwitch, SwitchQueue>();
 
-			if (actionStripVlan != null) {
-				if (actionStripVlan.stripVlan() == true) {
-					OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
-					openFlowActions.add(ofa);
-					actionsLen += ofa.getLength();
-				}
-			}
+        final Lock queuingLock = new ReentrantLock();
+        final Condition messagePushed = queuingLock.newCondition();
 
-			if (actionSetEthernetSrcAddr != null) {
-				OFActionDataLayerSource ofa = new OFActionDataLayerSource(
-						actionSetEthernetSrcAddr.addr().toBytes());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+        @Override
+        public void run() {
+            this.setName("FlowPusherThread " + this.getId());
+            while (true) {
+                while (!queuesHasMessageToSend()) {
+                    queuingLock.lock();
 
-			if (actionSetEthernetDstAddr != null) {
-				OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
-						actionSetEthernetDstAddr.addr().toBytes());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+                    try {
+                        // wait for message pushed to queue
+                        messagePushed.await();
+                    } catch (InterruptedException e) {
+                        // Interrupted to be shut down (not an error)
+                        log.debug("FlowPusherThread is interrupted");
+                        return;
+                    } finally {
+                        queuingLock.unlock();
+                    }
+                }
 
-			if (actionSetIPv4SrcAddr != null) {
-				OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
-						actionSetIPv4SrcAddr.addr().value());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+                // for safety of concurrent access, copy set of key objects
+                Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
+                for (IOFSwitch sw : assignedQueues.keySet()) {
+                    keys.add(sw);
+                }
 
-			if (actionSetIPv4DstAddr != null) {
-				OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
-						actionSetIPv4DstAddr.addr().value());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+                for (IOFSwitch sw : keys) {
+                    SwitchQueue queue = assignedQueues.get(sw);
 
-			if (actionSetIpToS != null) {
-				OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
-						actionSetIpToS.ipToS());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+                    if (sw == null || queue == null) {
+                        continue;
+                    }
 
-			if (actionSetTcpUdpSrcPort != null) {
-				OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
-						actionSetTcpUdpSrcPort.port());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+                    synchronized (queue) {
+                        processQueue(sw, queue, MAX_MESSAGE_SEND);
+                        if (queue.toBeDeleted && !queue.hasMessageToSend()) {
+                            // remove queue if flagged to be.
+                            assignedQueues.remove(sw);
+                        }
+                    }
+                }
+            }
+        }
 
-			if (actionSetTcpUdpDstPort != null) {
-				OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
-						actionSetTcpUdpDstPort.port());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
+        /**
+         * Read messages from queue and send them to the switch.
+         * If number of messages excess the limit, stop sending messages.
+         *
+         * @param sw      Switch to which messages will be sent.
+         * @param queue   Queue of messages.
+         * @param max_msg Limitation of number of messages to be sent. If set to 0,
+         *                all messages in queue will be sent.
+         */
+        private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
+            // check sending rate and determine it to be sent or not
+            long current_time = System.currentTimeMillis();
+            long size = 0;
 
-			if (actionEnqueue != null) {
-				OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
-						.value(), actionEnqueue.queueId());
-				openFlowActions.add(ofa);
-				actionsLen += ofa.getLength();
-			}
-		}
+            if (queue.isSendable(current_time)) {
+                int i = 0;
+                while (queue.hasMessageToSend()) {
+                    // Number of messages excess the limit
+                    if (0 < max_msg && max_msg <= i) {
+                        break;
+                    }
+                    ++i;
 
-		fm.setIdleTimeout((short)flowEntry.idleTimeout())
-				.setHardTimeout((short)flowEntry.hardTimeout())
-				.setPriority((short)flowEntry.priority())
-				.setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
-				.setCommand(flowModCommand).setMatch(match)
-				.setActions(openFlowActions)
-				.setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
-		fm.setOutPort(OFPort.OFPP_NONE.getValue());
-		if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
-				|| (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
-			if (actionOutputPort != null)
-				fm.setOutPort(actionOutputPort);
-		}
+                    SwitchQueueEntry queueEntry;
+                    synchronized (queue) {
+                        queueEntry = queue.poll();
+                    }
 
-		//
-		// Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
-		// permanent.
-		//
-		if ((flowEntry.idleTimeout() != 0) ||
-		    (flowEntry.hardTimeout() != 0)) {
-		    fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
-		}
+                    OFMessage msg = queueEntry.getOFMessage();
+                    try {
+                        messageDamper.write(sw, msg, context);
+                        if (log.isTraceEnabled()) {
+                            log.trace("Pusher sends message : {}", msg);
+                        }
+                        size += msg.getLength();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        log.error("Exception in sending message ({}) : {}", msg, e);
+                    }
+                }
 
-		if (log.isTraceEnabled()) {
-			log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
-					, flowEntry.flowEntryUserState()
-					, sw.getStringId()
-					, flowEntry.flowEntryId()
-					, matchSrcMac
-					, matchDstMac
-					, matchInPort
-					, actionOutputPort
-					);
-		}
+                sw.flush();
+                queue.logSentData(current_time, size);
+            }
+        }
 
-		return addMessageImpl(sw, fm, priority);
-	}
-	
-	/**
-	 * Add message to queue
-	 * @param sw
-	 * @param msg
-	 * @param flowEntryId
-	 * @return
-	 */
-	protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
-		FlowPusherThread thread = getProcessingThread(sw);
-		
-		SwitchQueue queue = getQueue(sw);
+        private boolean queuesHasMessageToSend() {
+            for (SwitchQueue queue : assignedQueues.values()) {
+                if (queue.hasMessageToSend()) {
+                    return true;
+                }
+            }
 
-		// create queue at first addition of message
-		if (queue == null) {
-			queue = createQueueImpl(sw);
-		}
-		
-		SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+            return false;
+        }
 
-		synchronized (queue) {
-			queue.add(entry,priority);
-			if (log.isTraceEnabled()) {
-				log.trace("Message is pushed : {}", entry.getOFMessage());
-			}
-		}
-		
-		thread.notifyMessagePushed();
+        private void notifyMessagePushed() {
+            queuingLock.lock();
+            try {
+                messagePushed.signal();
+            } finally {
+                queuingLock.unlock();
+            }
+        }
+    }
 
-		return true;
-	}
-	
-	@Override
-	public OFBarrierReply barrier(IOFSwitch sw) {
-		OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
-		if (future == null) {
-			return null;
-		}
-		
-		try {
-			return future.get();
-		} catch (InterruptedException e) {
-			e.printStackTrace();
-			log.error("InterruptedException: {}", e);
-			return null;
-		} catch (ExecutionException e) {
-			e.printStackTrace();
-			log.error("ExecutionException: {}", e);
-			return null;
-		}
-	}
+    /**
+     * Initialize object with one thread.
+     */
+    public FlowPusher() {
+        number_thread = DEFAULT_NUMBER_THREAD;
+    }
 
-	@Override
-	public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
-		// TODO creation of message and future should be moved to OFSwitchImpl
+    /**
+     * Initialize object with threads of given number.
+     *
+     * @param number_thread Number of threads to handle messages.
+     */
+    public FlowPusher(int number_thread) {
+        if (number_thread > 0) {
+            this.number_thread = number_thread;
+        } else {
+            this.number_thread = DEFAULT_NUMBER_THREAD;
+        }
+    }
 
-		if (sw == null) {
-			return null;
-		}
-		
-		OFBarrierRequest msg = createBarrierRequest(sw);
-		
-		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
-		barrierFutures.put(BarrierInfo.create(sw,msg), future);
-		
-		addMessageImpl(sw, msg, MsgPriority.NORMAL);
-		
-		return future;
-	}
+    /**
+     * Set parameters needed for sending messages.
+     *
+     * @param context    FloodlightContext used for sending messages.
+     *                   If null, FlowPusher uses default context.
+     * @param modContext FloodlightModuleContext used for acquiring
+     *                   ThreadPoolService and registering MessageListener.
+     * @param factory    Factory object to create OFMessage objects.
+     * @param damper     Message damper used for sending messages.
+     *                   If null, FlowPusher creates its own damper object.
+     */
+    public void init(FloodlightContext context,
+                     FloodlightModuleContext modContext,
+                     BasicFactory factory,
+                     OFMessageDamper damper) {
+        this.context = context;
+        this.factory = factory;
+        this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+        IFloodlightProviderService flservice
+                = modContext.getServiceImpl(IFloodlightProviderService.class);
+        flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
 
-	protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
-		OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
-		msg.setXid(sw.getNextTransactionId());
-		
-		return msg;
-	}
-	
-	/**
-	 * Get a queue attached to a switch.
-	 * @param sw Switch object
-	 * @return Queue object
-	 */
-	protected SwitchQueue getQueue(IOFSwitch sw) {
-		if (sw == null)  {
-			return null;
-		}
-		
-		FlowPusherThread th = getProcessingThread(sw);
-		if (th == null) {
-			return null;
-		}
-		
-		return th.assignedQueues.get(sw);
-	}
-	
-	/**
-	 * Get a hash value correspondent to a switch.
-	 * @param sw Switch object
-	 * @return Hash value
-	 */
-	protected long getHash(IOFSwitch sw) {
-		// This code assumes DPID is sequentially assigned.
-		// TODO consider equalization algorithm
-		return sw.getId() % number_thread;
-	}
+        if (damper != null) {
+            messageDamper = damper;
+        } else {
+            // use default values
+            messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+                    EnumSet.of(OFType.FLOW_MOD),
+                    OFMESSAGE_DAMPER_TIMEOUT);
+        }
+    }
 
-	/**
-	 * Get a Thread object which processes the queue attached to a switch.
-	 * @param sw Switch object
-	 * @return Thread object
-	 */
-	protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
-		long hash = getHash(sw);
-		
-		return threadMap.get(hash);
-	}
-	
-	@Override
-	public String getName() {
-		return "flowpusher";
-	}
+    /**
+     * Begin processing queue.
+     */
+    public void start() {
+        if (factory == null) {
+            log.error("FlowPusher not yet initialized.");
+            return;
+        }
 
-	@Override
-	public boolean isCallbackOrderingPrereq(OFType type, String name) {
-		return false;
-	}
+        threadMap = new HashMap<Long, FlowPusherThread>();
+        for (long i = 0; i < number_thread; ++i) {
+            FlowPusherThread thread = new FlowPusherThread();
 
-	@Override
-	public boolean isCallbackOrderingPostreq(OFType type, String name) {
-		return false;
-	}
+            threadMap.put(i, thread);
+            thread.start();
+        }
+    }
 
-	@Override
-	public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
-		if (log.isTraceEnabled()) {
-			log.trace("Received BARRIER_REPLY from : {}", sw.getId());
-		}
+    @Override
+    public boolean suspend(IOFSwitch sw) {
+        SwitchQueue queue = getQueue(sw);
 
-		if (msg.getType() != OFType.BARRIER_REPLY) {
-			log.error("Unexpected reply message : {}", msg.getType());
-			return Command.CONTINUE;
-		}
-		
-		OFBarrierReply reply = (OFBarrierReply) msg;
-		BarrierInfo info = BarrierInfo.create(sw,reply);
-		
-		// Deliver future if exists
-		OFBarrierReplyFuture future = barrierFutures.get(info);
-		if (future != null) {
-			future.deliverFuture(sw, msg);
-			barrierFutures.remove(info);
-		}
-		
-		return Command.CONTINUE;
-	}
+        if (queue == null) {
+            // create queue in case suspend is called before first message addition
+            queue = createQueueImpl(sw);
+        }
+
+        synchronized (queue) {
+            if (queue.state == QueueState.READY) {
+                queue.state = QueueState.SUSPENDED;
+                return true;
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public boolean resume(IOFSwitch sw) {
+        SwitchQueue queue = getQueue(sw);
+
+        if (queue == null) {
+            log.error("No queue is attached to DPID : {}", sw.getId());
+            return false;
+        }
+
+        synchronized (queue) {
+            if (queue.state == QueueState.SUSPENDED) {
+                queue.state = QueueState.READY;
+
+                // Free the latch if queue has any messages
+                FlowPusherThread thread = getProcessingThread(sw);
+                if (queue.hasMessageToSend()) {
+                    thread.notifyMessagePushed();
+                }
+                return true;
+            }
+            return false;
+        }
+    }
+
+    @Override
+    public QueueState getState(IOFSwitch sw) {
+        SwitchQueue queue = getQueue(sw);
+
+        if (queue == null) {
+            return QueueState.UNKNOWN;
+        }
+
+        return queue.state;
+    }
+
+    /**
+     * Stop processing queue and exit thread.
+     */
+    public void stop() {
+        if (threadMap == null) {
+            return;
+        }
+
+        for (FlowPusherThread t : threadMap.values()) {
+            t.interrupt();
+        }
+    }
+
+    @Override
+    public void setRate(IOFSwitch sw, long rate) {
+        SwitchQueue queue = getQueue(sw);
+        if (queue == null) {
+            queue = createQueueImpl(sw);
+        }
+
+        if (rate > 0) {
+            log.debug("rate for {} is set to {}", sw.getId(), rate);
+            synchronized (queue) {
+                queue.max_rate = rate;
+            }
+        }
+    }
+
+    @Override
+    public boolean createQueue(IOFSwitch sw) {
+        SwitchQueue queue = createQueueImpl(sw);
+
+        return (queue != null);
+    }
+
+    protected SwitchQueue createQueueImpl(IOFSwitch sw) {
+        SwitchQueue queue = getQueue(sw);
+        if (queue != null) {
+            return queue;
+        }
+
+        FlowPusherThread proc = getProcessingThread(sw);
+        queue = new SwitchQueue();
+        queue.state = QueueState.READY;
+        proc.assignedQueues.put(sw, queue);
+
+        return queue;
+    }
+
+    @Override
+    public boolean deleteQueue(IOFSwitch sw) {
+        return deleteQueue(sw, false);
+    }
+
+    @Override
+    public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
+        FlowPusherThread proc = getProcessingThread(sw);
+
+        if (forceStop) {
+            SwitchQueue queue = proc.assignedQueues.remove(sw);
+            if (queue == null) {
+                return false;
+            }
+            return true;
+        } else {
+            SwitchQueue queue = getQueue(sw);
+            if (queue == null) {
+                return false;
+            }
+            synchronized (queue) {
+                queue.toBeDeleted = true;
+            }
+            return true;
+        }
+    }
+
+    @Override
+    public boolean add(IOFSwitch sw, OFMessage msg) {
+        return add(sw, msg, MsgPriority.NORMAL);
+    }
+
+    @Override
+    public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+        return addMessageImpl(sw, msg, priority);
+    }
+
+    @Override
+    public void pushFlowEntries(
+            Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+        pushFlowEntries(entries, MsgPriority.NORMAL);
+    }
+
+    @Override
+    public void pushFlowEntries(
+            Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
+
+        for (Pair<IOFSwitch, FlowEntry> entry : entries) {
+            add(entry.first, entry.second, priority);
+        }
+    }
+
+    @Override
+    public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
+        pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
+    }
+
+    @Override
+    public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+        Collection<Pair<IOFSwitch, FlowEntry>> entries =
+                new LinkedList<Pair<IOFSwitch, FlowEntry>>();
+
+        entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
+        pushFlowEntries(entries, priority);
+    }
+
+    /**
+     * Create a message from FlowEntry and add it to the queue of the switch.
+     *
+     * @param sw        Switch to which message is pushed.
+     * @param flowEntry FlowEntry object used for creating message.
+     * @return true if message is successfully added to a queue.
+     */
+    private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+        //
+        // Create the OpenFlow Flow Modification Entry to push
+        //
+        OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+        long cookie = flowEntry.flowEntryId().value();
+
+        short flowModCommand = OFFlowMod.OFPFC_ADD;
+        if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
+            flowModCommand = OFFlowMod.OFPFC_ADD;
+        } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
+            flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+        } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
+            flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+        } else {
+            // Unknown user state. Ignore the entry
+            log.debug(
+                    "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+                    flowEntry.flowEntryId(),
+                    flowEntry.flowEntryUserState());
+            return false;
+        }
+
+        //
+        // Fetch the match conditions.
+        //
+        // NOTE: The Flow matching conditions common for all Flow Entries are
+        // used ONLY if a Flow Entry does NOT have the corresponding matching
+        // condition set.
+        //
+        OFMatch match = new OFMatch();
+        match.setWildcards(OFMatch.OFPFW_ALL);
+        FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+
+        // Match the Incoming Port
+        Port matchInPort = flowEntryMatch.inPort();
+        if (matchInPort != null) {
+            match.setInputPort(matchInPort.value());
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+        }
+
+        // Match the Source MAC address
+        MACAddress matchSrcMac = flowEntryMatch.srcMac();
+        if (matchSrcMac != null) {
+            match.setDataLayerSource(matchSrcMac.toString());
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+        }
+
+        // Match the Destination MAC address
+        MACAddress matchDstMac = flowEntryMatch.dstMac();
+        if (matchDstMac != null) {
+            match.setDataLayerDestination(matchDstMac.toString());
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+        }
+
+        // Match the Ethernet Frame Type
+        Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
+        if (matchEthernetFrameType != null) {
+            match.setDataLayerType(matchEthernetFrameType);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+        }
+
+        // Match the VLAN ID
+        Short matchVlanId = flowEntryMatch.vlanId();
+        if (matchVlanId != null) {
+            match.setDataLayerVirtualLan(matchVlanId);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+        }
+
+        // Match the VLAN priority
+        Byte matchVlanPriority = flowEntryMatch.vlanPriority();
+        if (matchVlanPriority != null) {
+            match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+            match.setWildcards(match.getWildcards()
+                    & ~OFMatch.OFPFW_DL_VLAN_PCP);
+        }
+
+        // Match the Source IPv4 Network prefix
+        IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
+        if (matchSrcIPv4Net != null) {
+            match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
+        }
+
+        // Natch the Destination IPv4 Network prefix
+        IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
+        if (matchDstIPv4Net != null) {
+            match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
+        }
+
+        // Match the IP protocol
+        Byte matchIpProto = flowEntryMatch.ipProto();
+        if (matchIpProto != null) {
+            match.setNetworkProtocol(matchIpProto);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+        }
+
+        // Match the IP ToS (DSCP field, 6 bits)
+        Byte matchIpToS = flowEntryMatch.ipToS();
+        if (matchIpToS != null) {
+            match.setNetworkTypeOfService(matchIpToS);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+        }
+
+        // Match the Source TCP/UDP port
+        Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
+        if (matchSrcTcpUdpPort != null) {
+            match.setTransportSource(matchSrcTcpUdpPort);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+        }
+
+        // Match the Destination TCP/UDP port
+        Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
+        if (matchDstTcpUdpPort != null) {
+            match.setTransportDestination(matchDstTcpUdpPort);
+            match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+        }
+
+        //
+        // Fetch the actions
+        //
+        Short actionOutputPort = null;
+        List<OFAction> openFlowActions = new ArrayList<OFAction>();
+        int actionsLen = 0;
+        FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
+        //
+        for (FlowEntryAction action : flowEntryActions.actions()) {
+            ActionOutput actionOutput = action.actionOutput();
+            ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+            ActionSetVlanPriority actionSetVlanPriority = action
+                    .actionSetVlanPriority();
+            ActionStripVlan actionStripVlan = action.actionStripVlan();
+            ActionSetEthernetAddr actionSetEthernetSrcAddr = action
+                    .actionSetEthernetSrcAddr();
+            ActionSetEthernetAddr actionSetEthernetDstAddr = action
+                    .actionSetEthernetDstAddr();
+            ActionSetIPv4Addr actionSetIPv4SrcAddr = action
+                    .actionSetIPv4SrcAddr();
+            ActionSetIPv4Addr actionSetIPv4DstAddr = action
+                    .actionSetIPv4DstAddr();
+            ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+            ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
+                    .actionSetTcpUdpSrcPort();
+            ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
+                    .actionSetTcpUdpDstPort();
+            ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+            if (actionOutput != null) {
+                actionOutputPort = actionOutput.port().value();
+                // XXX: The max length is hard-coded for now
+                OFActionOutput ofa = new OFActionOutput(actionOutput.port()
+                        .value(), (short) 0xffff);
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetVlanId != null) {
+                OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
+                        actionSetVlanId.vlanId());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetVlanPriority != null) {
+                OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
+                        actionSetVlanPriority.vlanPriority());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionStripVlan != null) {
+                if (actionStripVlan.stripVlan() == true) {
+                    OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+                    openFlowActions.add(ofa);
+                    actionsLen += ofa.getLength();
+                }
+            }
+
+            if (actionSetEthernetSrcAddr != null) {
+                OFActionDataLayerSource ofa = new OFActionDataLayerSource(
+                        actionSetEthernetSrcAddr.addr().toBytes());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetEthernetDstAddr != null) {
+                OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
+                        actionSetEthernetDstAddr.addr().toBytes());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetIPv4SrcAddr != null) {
+                OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
+                        actionSetIPv4SrcAddr.addr().value());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetIPv4DstAddr != null) {
+                OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
+                        actionSetIPv4DstAddr.addr().value());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetIpToS != null) {
+                OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
+                        actionSetIpToS.ipToS());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetTcpUdpSrcPort != null) {
+                OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
+                        actionSetTcpUdpSrcPort.port());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionSetTcpUdpDstPort != null) {
+                OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
+                        actionSetTcpUdpDstPort.port());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+
+            if (actionEnqueue != null) {
+                OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
+                        .value(), actionEnqueue.queueId());
+                openFlowActions.add(ofa);
+                actionsLen += ofa.getLength();
+            }
+        }
+
+        fm.setIdleTimeout((short) flowEntry.idleTimeout())
+                .setHardTimeout((short) flowEntry.hardTimeout())
+                .setPriority((short) flowEntry.priority())
+                .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
+                .setCommand(flowModCommand).setMatch(match)
+                .setActions(openFlowActions)
+                .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+        fm.setOutPort(OFPort.OFPP_NONE.getValue());
+        if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
+                || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+            if (actionOutputPort != null)
+                fm.setOutPort(actionOutputPort);
+        }
+
+        //
+        // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
+        // permanent.
+        //
+        if ((flowEntry.idleTimeout() != 0) ||
+                (flowEntry.hardTimeout() != 0)) {
+            fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+        }
+
+        if (log.isTraceEnabled()) {
+            log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
+                    , flowEntry.flowEntryUserState()
+                    , sw.getStringId()
+                    , flowEntry.flowEntryId()
+                    , matchSrcMac
+                    , matchDstMac
+                    , matchInPort
+                    , actionOutputPort
+            );
+        }
+
+        return addMessageImpl(sw, fm, priority);
+    }
+
+    /**
+     * Add message to queue
+     *
+     * @param sw
+     * @param msg
+     * @param flowEntryId
+     * @return
+     */
+    protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+        FlowPusherThread thread = getProcessingThread(sw);
+
+        SwitchQueue queue = getQueue(sw);
+
+        // create queue at first addition of message
+        if (queue == null) {
+            queue = createQueueImpl(sw);
+        }
+
+        SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+
+        synchronized (queue) {
+            queue.add(entry, priority);
+            if (log.isTraceEnabled()) {
+                log.trace("Message is pushed : {}", entry.getOFMessage());
+            }
+        }
+
+        thread.notifyMessagePushed();
+
+        return true;
+    }
+
+    @Override
+    public OFBarrierReply barrier(IOFSwitch sw) {
+        OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+        if (future == null) {
+            return null;
+        }
+
+        try {
+            return future.get();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+            log.error("InterruptedException: {}", e);
+            return null;
+        } catch (ExecutionException e) {
+            e.printStackTrace();
+            log.error("ExecutionException: {}", e);
+            return null;
+        }
+    }
+
+    @Override
+    public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+        // TODO creation of message and future should be moved to OFSwitchImpl
+
+        if (sw == null) {
+            return null;
+        }
+
+        OFBarrierRequest msg = createBarrierRequest(sw);
+
+        OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+        barrierFutures.put(BarrierInfo.create(sw, msg), future);
+
+        addMessageImpl(sw, msg, MsgPriority.NORMAL);
+
+        return future;
+    }
+
+    protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
+        OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+        msg.setXid(sw.getNextTransactionId());
+
+        return msg;
+    }
+
+    /**
+     * Get a queue attached to a switch.
+     *
+     * @param sw Switch object
+     * @return Queue object
+     */
+    protected SwitchQueue getQueue(IOFSwitch sw) {
+        if (sw == null) {
+            return null;
+        }
+
+        FlowPusherThread th = getProcessingThread(sw);
+        if (th == null) {
+            return null;
+        }
+
+        return th.assignedQueues.get(sw);
+    }
+
+    /**
+     * Get a hash value correspondent to a switch.
+     *
+     * @param sw Switch object
+     * @return Hash value
+     */
+    protected long getHash(IOFSwitch sw) {
+        // This code assumes DPID is sequentially assigned.
+        // TODO consider equalization algorithm
+        return sw.getId() % number_thread;
+    }
+
+    /**
+     * Get a Thread object which processes the queue attached to a switch.
+     *
+     * @param sw Switch object
+     * @return Thread object
+     */
+    protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
+        long hash = getHash(sw);
+
+        return threadMap.get(hash);
+    }
+
+    @Override
+    public String getName() {
+        return "flowpusher";
+    }
+
+    @Override
+    public boolean isCallbackOrderingPrereq(OFType type, String name) {
+        return false;
+    }
+
+    @Override
+    public boolean isCallbackOrderingPostreq(OFType type, String name) {
+        return false;
+    }
+
+    @Override
+    public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+        if (log.isTraceEnabled()) {
+            log.trace("Received BARRIER_REPLY from : {}", sw.getId());
+        }
+
+        if (msg.getType() != OFType.BARRIER_REPLY) {
+            log.error("Unexpected reply message : {}", msg.getType());
+            return Command.CONTINUE;
+        }
+
+        OFBarrierReply reply = (OFBarrierReply) msg;
+        BarrierInfo info = BarrierInfo.create(sw, reply);
+
+        // Deliver future if exists
+        OFBarrierReplyFuture future = barrierFutures.get(info);
+        if (future != null) {
+            future.deliverFuture(sw, msg);
+            barrierFutures.remove(info);
+        }
+
+        return Command.CONTINUE;
+    }
 }