Cherry-pick from https://gerrit.onos.onlab.us/#/c/339/

Cherry-pick from https://gerrit.onos.onlab.us/#/c/67

Implemented bypassing function to FlowPusher so that FlowSynchronizer can
push flows while queue is suspended (ONOS-967).

In this commit I modified FlowPusher to have two (or more, in future)
queues to prioritize messages.
High priority queue is for flow synchronization. This queue won't be
blocked even in SUSPENDED status.
Low priority queue is for normal OpenFlow message installation. This
queue works only in READY status, but blocked in SUSPENDED status.
Messages in high priority queue are always installed before messages in
low priority queue. In other words, messages in low priority queue are
installed only if high priority queue is empty.

Moved timing of FlowEntryAdded notification.

Change-Id: Ic7ee4ce002c1f6b50af4f160fbf98eb82658ee8b

Implemented bypassing function to FlowPusher.

Change-Id: Ia3ae86242ceed34257c58e9aaffcdd8f72534bf3

Fixed a bug FlowPusher#hasMessageToSend doesn't work with SUSPENDED status.

Conflicts:
        src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java

NOTE: The above conflict was resolved by hand.

Change-Id: I79446dced030ecd91a6e81333cb93c1c4b637914
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index d2af973..d3476c7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -10,6 +10,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -54,6 +55,7 @@
  */
 public class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+    protected static final int DEFAULT_NUMBER_THREAD = 1;
 
     // NOTE: Below are moved from FlowManager.
     // TODO: Values copied from elsewhere (class LearningSwitch).
@@ -65,11 +67,6 @@
     // Number of messages sent to switch at once
     protected static final int MAX_MESSAGE_SEND = 100;
 
-    public enum QueueState {
-		READY,
-		SUSPENDED,
-	}
-
 	private static class SwitchQueueEntry {
 		OFMessage msg;
 		
@@ -88,9 +85,8 @@
 	 * @author Naoki Shiota
 	 *
 	 */
-	private class SwitchQueue extends ArrayDeque<SwitchQueueEntry> {
-		private static final long serialVersionUID = 1L;
-
+	private class SwitchQueue {
+		List<Queue<SwitchQueueEntry>> raw_queues;
 		QueueState state;
 		
 		// Max rate of sending message (bytes/ms). 0 implies no limitation.
@@ -101,6 +97,16 @@
 		// "To be deleted" flag
 		boolean toBeDeleted = false;
 		
+		SwitchQueue() {
+			raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
+					 MsgPriority.values().length);
+			for (int i = 0; i < MsgPriority.values().length; ++i) {
+				raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
+			}
+			
+			state = QueueState.READY;
+		}
+		
 		/**
 		 * Check if sending rate is within the rate
 		 * @param current Current time
@@ -130,6 +136,72 @@
 			last_sent_time = current;
 			last_sent_size = size;
 		}
+		
+		boolean add(SwitchQueueEntry entry, MsgPriority priority) {
+			Queue<SwitchQueueEntry> queue = getQueue(priority);
+			if (queue == null) {
+				log.error("Unexpected priority : ", priority);
+				return false;
+			}
+			return queue.add(entry);
+		}
+		
+		/**
+		 * Poll single appropriate entry object according to QueueState.
+		 * @return Entry object.
+		 */
+		SwitchQueueEntry poll() {
+			switch (state) {
+			case READY:
+			{
+				for (int i = 0; i < raw_queues.size(); ++i) {
+					SwitchQueueEntry entry = raw_queues.get(i).poll();
+					if (entry != null) {
+						return entry;
+					}
+				}
+				
+				return null;
+			}
+			case SUSPENDED:
+			{
+				// Only polling from high priority queue
+				SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
+				return entry;
+			}
+			default:
+				log.error("Unexpected QueueState : ", state);
+				return null;
+			}
+		}
+		
+		/**
+		 * Check if this object has any messages in the queues to be sent
+		 * @return True if there are some messages to be sent.
+		 */
+		boolean hasMessageToSend() {
+			switch (state) {
+			case READY:
+				for (Queue<SwitchQueueEntry> queue : raw_queues) {
+					if (! queue.isEmpty()) {
+						return true;
+					}
+				}
+			break;
+			case SUSPENDED:
+				// Only checking high priority queue
+				return (! getQueue(MsgPriority.HIGH).isEmpty());
+			default:
+				log.error("Unexpected QueueState : ", state);
+				return false;
+			}
+			
+			return false;
+		}
+		
+		Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
+			return raw_queues.get(priority.ordinal());
+		}
 	}
 	
 	/**
@@ -191,7 +263,7 @@
 	private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
 		= new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
 	
-	private int number_thread = 1;
+	private int number_thread;
 	
 	/**
 	 * Main thread that reads messages from queues and sends them to switches.
@@ -213,12 +285,12 @@
 					// wait for message pushed to queue
 					mutex.acquire();
 				} catch (InterruptedException e) {
-					// not an error
+					// Interrupted to be shut down (not an error)
 					log.debug("FlowPusherThread is interrupted");
 					return;
 				}
 				
-				// for safety of concurrent access, copy all key objects
+				// for safety of concurrent access, copy set of key objects
 				Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
 				for (IOFSwitch sw : queues.keySet()) {
 					keys.add(sw);
@@ -227,21 +299,19 @@
 				for (IOFSwitch sw : keys) {
 					SwitchQueue queue = queues.get(sw);
 
-					// Skip if queue is suspended
-					if (sw == null || queue == null ||
-							queue.state != QueueState.READY) {
+					if (sw == null || queue == null) {
 						continue;
 					}
 					
 					synchronized (queue) {
 						processQueue(sw, queue, MAX_MESSAGE_SEND);
-						if (queue.isEmpty()) {
+						if (! queue.hasMessageToSend()) {
 							// remove queue if flagged to be.
 							if (queue.toBeDeleted) {
 								queues.remove(sw);
 							}
 						} else {
-							// if some messages remains in queue, latch down
+							// Free the latch if message remains in queue
 							if (mutex.availablePermits() == 0) {
 								mutex.release();
 							}
@@ -266,7 +336,7 @@
 			
 			if (queue.isSendable(current_time)) {
 				int i = 0;
-				while (! queue.isEmpty()) {
+				while (queue.hasMessageToSend()) {
 					// Number of messages excess the limit
 					if (0 < max_msg && max_msg <= i) {
 						break;
@@ -301,6 +371,7 @@
 	 * Initialize object with one thread.
 	 */
 	public FlowPusher() {
+		number_thread = DEFAULT_NUMBER_THREAD;
 	}
 	
 	/**
@@ -308,7 +379,11 @@
 	 * @param number_thread Number of threads to handle messages.
 	 */
 	public FlowPusher(int number_thread) {
-		this.number_thread = number_thread;
+		if (number_thread > 0) {
+			this.number_thread = number_thread;
+		} else {
+			this.number_thread = DEFAULT_NUMBER_THREAD;
+		}
 	}
 	
 	/**
@@ -364,7 +439,8 @@
 		SwitchQueue queue = getQueue(sw);
 		
 		if (queue == null) {
-			return false;
+			// create queue in case suspend is called before first message addition
+			queue = createQueueImpl(sw);
 		}
 		
 		synchronized (queue) {
@@ -381,6 +457,7 @@
 		SwitchQueue queue = getQueue(sw);
 		
 		if (queue == null) {
+			log.error("No queue is attached to DPID : {}", sw.getId());
 			return false;
 		}
 		
@@ -388,9 +465,9 @@
 			if (queue.state == QueueState.SUSPENDED) {
 				queue.state = QueueState.READY;
 				
-				// Latch down if queue is not empty
-				FlowPusherThread thread = getProcess(sw);
-				if (! queue.isEmpty() &&
+				// Free the latch if queue has any messages
+				FlowPusherThread thread = getProcessingThread(sw);
+				if (queue.hasMessageToSend() &&
 						thread.mutex.availablePermits() == 0) {
 					thread.mutex.release();
 				}
@@ -401,15 +478,14 @@
 	}
 	
 	@Override
-	public boolean isSuspended(IOFSwitch sw) {
+	public QueueState getState(IOFSwitch sw) {
 		SwitchQueue queue = getQueue(sw);
 		
 		if (queue == null) {
-			// TODO Is true suitable for this case?
-			return true;
+			return QueueState.UNKNOWN;
 		}
 		
-		return (queue.state == QueueState.SUSPENDED);
+		return queue.state;
 	}
 
 	/**
@@ -429,7 +505,7 @@
 	public void setRate(IOFSwitch sw, long rate) {
 		SwitchQueue queue = getQueue(sw);
 		if (queue == null) {
-			return;
+			queue = createQueueImpl(sw);
 		}
 		
 		if (rate > 0) {
@@ -442,17 +518,23 @@
 
 	@Override
 	public boolean createQueue(IOFSwitch sw) {
+		SwitchQueue queue = createQueueImpl(sw);
+		
+		return (queue != null);
+	}
+	
+	protected SwitchQueue createQueueImpl(IOFSwitch sw) {
 		SwitchQueue queue = getQueue(sw);
 		if (queue != null) {
-			return false;
+			return queue;
 		}
 		
-		FlowPusherThread proc = getProcess(sw);
+		FlowPusherThread proc = getProcessingThread(sw);
 		queue = new SwitchQueue();
 		queue.state = QueueState.READY;
 		proc.queues.put(sw, queue);
 		
-		return true;
+		return queue;
 	}
 
 	@Override
@@ -462,7 +544,7 @@
 	
 	@Override
 	public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
-		FlowPusherThread proc = getProcess(sw);
+		FlowPusherThread proc = getProcessingThread(sw);
 		
 		if (forceStop) {
 			SwitchQueue queue = proc.queues.remove(sw);
@@ -484,34 +566,50 @@
 	
 	@Override
 	public boolean add(IOFSwitch sw, OFMessage msg) {
-		return addMessageImpl(sw, msg);
+		return add(sw, msg, MsgPriority.NORMAL);
+	}
+	
+	@Override
+	public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+		return addMessageImpl(sw, msg, priority);
 	}
 	
 	@Override
 	public void pushFlowEntries(
 		Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+		pushFlowEntries(entries, MsgPriority.NORMAL);
+	}
+	
+	@Override
+	public void pushFlowEntries(
+		Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
 
 		for (Pair<IOFSwitch, FlowEntry> entry : entries) {
-			add(entry.first, entry.second);
+			add(entry.first, entry.second, priority);
 		}
 	}
 
 	@Override
 	public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
+	    pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
+	}
+
+	@Override
+	public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
 	    Collection<Pair<IOFSwitch, FlowEntry>> entries = 
 		new LinkedList<Pair<IOFSwitch, FlowEntry>>();
 
 	    entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
-	    pushFlowEntries(entries);
+	    pushFlowEntries(entries, priority);
 	}
-
+	
 	/**
 	 * Create a message from FlowEntry and add it to the queue of the switch.
 	 * @param sw Switch to which message is pushed.
 	 * @param flowEntry FlowEntry object used for creating message.
 	 * @return true if message is successfully added to a queue.
 	 */
-	private boolean add(IOFSwitch sw, FlowEntry flowEntry) {
+	private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
 		//
 		// Create the OpenFlow Flow Modification Entry to push
 		//
@@ -768,20 +866,19 @@
 		    fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
 		}
 
-		//
-		// Write the message to the switch
-		//
-		log.debug("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
-			, flowEntry.flowEntryUserState()
-			, sw.getStringId()
-			, flowEntry.flowEntryId()
-			, matchSrcMac
-			, matchDstMac
-			, matchInPort
-			, actionOutputPort
-			);
+		if (log.isTraceEnabled()) {
+			log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
+					, flowEntry.flowEntryUserState()
+					, sw.getStringId()
+					, flowEntry.flowEntryId()
+					, matchSrcMac
+					, matchDstMac
+					, matchInPort
+					, actionOutputPort
+					);
+		}
 
-		return addMessageImpl(sw, fm);
+		return addMessageImpl(sw, fm, priority);
 	}
 	
 	/**
@@ -791,19 +888,18 @@
 	 * @param flowEntryId
 	 * @return
 	 */
-	protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg) {
-		FlowPusherThread proc = getProcess(sw);
-		SwitchQueue queue = proc.queues.get(sw);
+	protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+		FlowPusherThread proc = getProcessingThread(sw);
+		SwitchQueue queue = getQueue(sw);
 
 		// create queue at first addition of message
 		if (queue == null) {
-			createQueue(sw);
-			queue = getQueue(sw);
+			queue = createQueueImpl(sw);
 		}
 		
 		SwitchQueueEntry entry = new SwitchQueueEntry(msg);
 		synchronized (queue) {
-			queue.add(entry);
+			queue.add(entry,priority);
 			if (log.isTraceEnabled()) {
 				log.trace("Message is pushed : {}", entry.getOFMessage());
 			}
@@ -849,7 +945,7 @@
 		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
 		barrierFutures.put(BarrierInfo.create(sw,msg), future);
 		
-		addMessageImpl(sw, msg);
+		addMessageImpl(sw, msg, MsgPriority.NORMAL);
 		
 		return future;
 	}
@@ -871,7 +967,12 @@
 			return null;
 		}
 		
-		return getProcess(sw).queues.get(sw);
+		FlowPusherThread th = getProcessingThread(sw);
+		if (th == null) {
+			return null;
+		}
+		
+		return th.queues.get(sw);
 	}
 	
 	/**
@@ -890,7 +991,7 @@
 	 * @param sw Switch object
 	 * @return Thread object
 	 */
-	protected FlowPusherThread getProcess(IOFSwitch sw) {
+	protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
 		long hash = getHash(sw);
 		
 		return threadMap.get(hash);