Cherry-pick from https://gerrit.onos.onlab.us/#/c/339/

Cherry-pick from https://gerrit.onos.onlab.us/#/c/67

Implemented bypassing function to FlowPusher so that FlowSynchronizer can
push flows while queue is suspended (ONOS-967).

In this commit I modified FlowPusher to have two (or more, in future)
queues to prioritize messages.
High priority queue is for flow synchronization. This queue won't be
blocked even in SUSPENDED status.
Low priority queue is for normal OpenFlow message installation. This
queue works only in READY status, but blocked in SUSPENDED status.
Messages in high priority queue are always installed before messages in
low priority queue. In other words, messages in low priority queue are
installed only if high priority queue is empty.

Moved timing of FlowEntryAdded notification.

Change-Id: Ic7ee4ce002c1f6b50af4f160fbf98eb82658ee8b

Implemented bypassing function to FlowPusher.

Change-Id: Ia3ae86242ceed34257c58e9aaffcdd8f72534bf3

Fixed a bug FlowPusher#hasMessageToSend doesn't work with SUSPENDED status.

Conflicts:
        src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java

NOTE: The above conflict was resolved by hand.

Change-Id: I79446dced030ecd91a6e81333cb93c1c4b637914
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index d2af973..d3476c7 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -10,6 +10,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -54,6 +55,7 @@
  */
 public class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+    protected static final int DEFAULT_NUMBER_THREAD = 1;
 
     // NOTE: Below are moved from FlowManager.
     // TODO: Values copied from elsewhere (class LearningSwitch).
@@ -65,11 +67,6 @@
     // Number of messages sent to switch at once
     protected static final int MAX_MESSAGE_SEND = 100;
 
-    public enum QueueState {
-		READY,
-		SUSPENDED,
-	}
-
 	private static class SwitchQueueEntry {
 		OFMessage msg;
 		
@@ -88,9 +85,8 @@
 	 * @author Naoki Shiota
 	 *
 	 */
-	private class SwitchQueue extends ArrayDeque<SwitchQueueEntry> {
-		private static final long serialVersionUID = 1L;
-
+	private class SwitchQueue {
+		List<Queue<SwitchQueueEntry>> raw_queues;
 		QueueState state;
 		
 		// Max rate of sending message (bytes/ms). 0 implies no limitation.
@@ -101,6 +97,16 @@
 		// "To be deleted" flag
 		boolean toBeDeleted = false;
 		
+		SwitchQueue() {
+			raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
+					 MsgPriority.values().length);
+			for (int i = 0; i < MsgPriority.values().length; ++i) {
+				raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
+			}
+			
+			state = QueueState.READY;
+		}
+		
 		/**
 		 * Check if sending rate is within the rate
 		 * @param current Current time
@@ -130,6 +136,72 @@
 			last_sent_time = current;
 			last_sent_size = size;
 		}
+		
+		boolean add(SwitchQueueEntry entry, MsgPriority priority) {
+			Queue<SwitchQueueEntry> queue = getQueue(priority);
+			if (queue == null) {
+				log.error("Unexpected priority : ", priority);
+				return false;
+			}
+			return queue.add(entry);
+		}
+		
+		/**
+		 * Poll single appropriate entry object according to QueueState.
+		 * @return Entry object.
+		 */
+		SwitchQueueEntry poll() {
+			switch (state) {
+			case READY:
+			{
+				for (int i = 0; i < raw_queues.size(); ++i) {
+					SwitchQueueEntry entry = raw_queues.get(i).poll();
+					if (entry != null) {
+						return entry;
+					}
+				}
+				
+				return null;
+			}
+			case SUSPENDED:
+			{
+				// Only polling from high priority queue
+				SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
+				return entry;
+			}
+			default:
+				log.error("Unexpected QueueState : ", state);
+				return null;
+			}
+		}
+		
+		/**
+		 * Check if this object has any messages in the queues to be sent
+		 * @return True if there are some messages to be sent.
+		 */
+		boolean hasMessageToSend() {
+			switch (state) {
+			case READY:
+				for (Queue<SwitchQueueEntry> queue : raw_queues) {
+					if (! queue.isEmpty()) {
+						return true;
+					}
+				}
+			break;
+			case SUSPENDED:
+				// Only checking high priority queue
+				return (! getQueue(MsgPriority.HIGH).isEmpty());
+			default:
+				log.error("Unexpected QueueState : ", state);
+				return false;
+			}
+			
+			return false;
+		}
+		
+		Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
+			return raw_queues.get(priority.ordinal());
+		}
 	}
 	
 	/**
@@ -191,7 +263,7 @@
 	private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
 		= new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
 	
-	private int number_thread = 1;
+	private int number_thread;
 	
 	/**
 	 * Main thread that reads messages from queues and sends them to switches.
@@ -213,12 +285,12 @@
 					// wait for message pushed to queue
 					mutex.acquire();
 				} catch (InterruptedException e) {
-					// not an error
+					// Interrupted to be shut down (not an error)
 					log.debug("FlowPusherThread is interrupted");
 					return;
 				}
 				
-				// for safety of concurrent access, copy all key objects
+				// for safety of concurrent access, copy set of key objects
 				Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
 				for (IOFSwitch sw : queues.keySet()) {
 					keys.add(sw);
@@ -227,21 +299,19 @@
 				for (IOFSwitch sw : keys) {
 					SwitchQueue queue = queues.get(sw);
 
-					// Skip if queue is suspended
-					if (sw == null || queue == null ||
-							queue.state != QueueState.READY) {
+					if (sw == null || queue == null) {
 						continue;
 					}
 					
 					synchronized (queue) {
 						processQueue(sw, queue, MAX_MESSAGE_SEND);
-						if (queue.isEmpty()) {
+						if (! queue.hasMessageToSend()) {
 							// remove queue if flagged to be.
 							if (queue.toBeDeleted) {
 								queues.remove(sw);
 							}
 						} else {
-							// if some messages remains in queue, latch down
+							// Free the latch if message remains in queue
 							if (mutex.availablePermits() == 0) {
 								mutex.release();
 							}
@@ -266,7 +336,7 @@
 			
 			if (queue.isSendable(current_time)) {
 				int i = 0;
-				while (! queue.isEmpty()) {
+				while (queue.hasMessageToSend()) {
 					// Number of messages excess the limit
 					if (0 < max_msg && max_msg <= i) {
 						break;
@@ -301,6 +371,7 @@
 	 * Initialize object with one thread.
 	 */
 	public FlowPusher() {
+		number_thread = DEFAULT_NUMBER_THREAD;
 	}
 	
 	/**
@@ -308,7 +379,11 @@
 	 * @param number_thread Number of threads to handle messages.
 	 */
 	public FlowPusher(int number_thread) {
-		this.number_thread = number_thread;
+		if (number_thread > 0) {
+			this.number_thread = number_thread;
+		} else {
+			this.number_thread = DEFAULT_NUMBER_THREAD;
+		}
 	}
 	
 	/**
@@ -364,7 +439,8 @@
 		SwitchQueue queue = getQueue(sw);
 		
 		if (queue == null) {
-			return false;
+			// create queue in case suspend is called before first message addition
+			queue = createQueueImpl(sw);
 		}
 		
 		synchronized (queue) {
@@ -381,6 +457,7 @@
 		SwitchQueue queue = getQueue(sw);
 		
 		if (queue == null) {
+			log.error("No queue is attached to DPID : {}", sw.getId());
 			return false;
 		}
 		
@@ -388,9 +465,9 @@
 			if (queue.state == QueueState.SUSPENDED) {
 				queue.state = QueueState.READY;
 				
-				// Latch down if queue is not empty
-				FlowPusherThread thread = getProcess(sw);
-				if (! queue.isEmpty() &&
+				// Free the latch if queue has any messages
+				FlowPusherThread thread = getProcessingThread(sw);
+				if (queue.hasMessageToSend() &&
 						thread.mutex.availablePermits() == 0) {
 					thread.mutex.release();
 				}
@@ -401,15 +478,14 @@
 	}
 	
 	@Override
-	public boolean isSuspended(IOFSwitch sw) {
+	public QueueState getState(IOFSwitch sw) {
 		SwitchQueue queue = getQueue(sw);
 		
 		if (queue == null) {
-			// TODO Is true suitable for this case?
-			return true;
+			return QueueState.UNKNOWN;
 		}
 		
-		return (queue.state == QueueState.SUSPENDED);
+		return queue.state;
 	}
 
 	/**
@@ -429,7 +505,7 @@
 	public void setRate(IOFSwitch sw, long rate) {
 		SwitchQueue queue = getQueue(sw);
 		if (queue == null) {
-			return;
+			queue = createQueueImpl(sw);
 		}
 		
 		if (rate > 0) {
@@ -442,17 +518,23 @@
 
 	@Override
 	public boolean createQueue(IOFSwitch sw) {
+		SwitchQueue queue = createQueueImpl(sw);
+		
+		return (queue != null);
+	}
+	
+	protected SwitchQueue createQueueImpl(IOFSwitch sw) {
 		SwitchQueue queue = getQueue(sw);
 		if (queue != null) {
-			return false;
+			return queue;
 		}
 		
-		FlowPusherThread proc = getProcess(sw);
+		FlowPusherThread proc = getProcessingThread(sw);
 		queue = new SwitchQueue();
 		queue.state = QueueState.READY;
 		proc.queues.put(sw, queue);
 		
-		return true;
+		return queue;
 	}
 
 	@Override
@@ -462,7 +544,7 @@
 	
 	@Override
 	public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
-		FlowPusherThread proc = getProcess(sw);
+		FlowPusherThread proc = getProcessingThread(sw);
 		
 		if (forceStop) {
 			SwitchQueue queue = proc.queues.remove(sw);
@@ -484,34 +566,50 @@
 	
 	@Override
 	public boolean add(IOFSwitch sw, OFMessage msg) {
-		return addMessageImpl(sw, msg);
+		return add(sw, msg, MsgPriority.NORMAL);
+	}
+	
+	@Override
+	public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+		return addMessageImpl(sw, msg, priority);
 	}
 	
 	@Override
 	public void pushFlowEntries(
 		Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+		pushFlowEntries(entries, MsgPriority.NORMAL);
+	}
+	
+	@Override
+	public void pushFlowEntries(
+		Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
 
 		for (Pair<IOFSwitch, FlowEntry> entry : entries) {
-			add(entry.first, entry.second);
+			add(entry.first, entry.second, priority);
 		}
 	}
 
 	@Override
 	public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
+	    pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
+	}
+
+	@Override
+	public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
 	    Collection<Pair<IOFSwitch, FlowEntry>> entries = 
 		new LinkedList<Pair<IOFSwitch, FlowEntry>>();
 
 	    entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
-	    pushFlowEntries(entries);
+	    pushFlowEntries(entries, priority);
 	}
-
+	
 	/**
 	 * Create a message from FlowEntry and add it to the queue of the switch.
 	 * @param sw Switch to which message is pushed.
 	 * @param flowEntry FlowEntry object used for creating message.
 	 * @return true if message is successfully added to a queue.
 	 */
-	private boolean add(IOFSwitch sw, FlowEntry flowEntry) {
+	private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
 		//
 		// Create the OpenFlow Flow Modification Entry to push
 		//
@@ -768,20 +866,19 @@
 		    fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
 		}
 
-		//
-		// Write the message to the switch
-		//
-		log.debug("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
-			, flowEntry.flowEntryUserState()
-			, sw.getStringId()
-			, flowEntry.flowEntryId()
-			, matchSrcMac
-			, matchDstMac
-			, matchInPort
-			, actionOutputPort
-			);
+		if (log.isTraceEnabled()) {
+			log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
+					, flowEntry.flowEntryUserState()
+					, sw.getStringId()
+					, flowEntry.flowEntryId()
+					, matchSrcMac
+					, matchDstMac
+					, matchInPort
+					, actionOutputPort
+					);
+		}
 
-		return addMessageImpl(sw, fm);
+		return addMessageImpl(sw, fm, priority);
 	}
 	
 	/**
@@ -791,19 +888,18 @@
 	 * @param flowEntryId
 	 * @return
 	 */
-	protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg) {
-		FlowPusherThread proc = getProcess(sw);
-		SwitchQueue queue = proc.queues.get(sw);
+	protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+		FlowPusherThread proc = getProcessingThread(sw);
+		SwitchQueue queue = getQueue(sw);
 
 		// create queue at first addition of message
 		if (queue == null) {
-			createQueue(sw);
-			queue = getQueue(sw);
+			queue = createQueueImpl(sw);
 		}
 		
 		SwitchQueueEntry entry = new SwitchQueueEntry(msg);
 		synchronized (queue) {
-			queue.add(entry);
+			queue.add(entry,priority);
 			if (log.isTraceEnabled()) {
 				log.trace("Message is pushed : {}", entry.getOFMessage());
 			}
@@ -849,7 +945,7 @@
 		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
 		barrierFutures.put(BarrierInfo.create(sw,msg), future);
 		
-		addMessageImpl(sw, msg);
+		addMessageImpl(sw, msg, MsgPriority.NORMAL);
 		
 		return future;
 	}
@@ -871,7 +967,12 @@
 			return null;
 		}
 		
-		return getProcess(sw).queues.get(sw);
+		FlowPusherThread th = getProcessingThread(sw);
+		if (th == null) {
+			return null;
+		}
+		
+		return th.queues.get(sw);
 	}
 	
 	/**
@@ -890,7 +991,7 @@
 	 * @param sw Switch object
 	 * @return Thread object
 	 */
-	protected FlowPusherThread getProcess(IOFSwitch sw) {
+	protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
 		long hash = getHash(sw);
 		
 		return threadMap.get(hash);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 17ef14c..c1571c8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -29,6 +29,7 @@
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
 import net.onrc.onos.ofcontroller.flowmanager.FlowDatabaseOperation;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService.MsgPriority;
 import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -97,8 +98,7 @@
 	double graphIDTime, switchTime, compareTime, graphEntryTime, extractTime, pushTime, totalTime;
 	@Override
 	public SyncResult call() {
-	    // TODO: stop adding other flow entries while synchronizing
-	    //pusher.suspend(sw);
+	    pusher.suspend(sw);
 	    long start = System.nanoTime();
 	    Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
 	    long step1 = System.nanoTime();
@@ -115,7 +115,7 @@
 	    compareTime = (step3 - step2);
 	    totalTime = (step3 - start);
 	    outputTime();
-	    //pusher.resume(sw);
+	    pusher.resume(sw);
 	    
 	    return result;
 	}
@@ -301,7 +301,7 @@
 	    extractTime = System.nanoTime() - startExtract;
 
 	    double startPush = System.nanoTime();
-	    pusher.pushFlowEntry(sw, flowEntry);
+	    pusher.pushFlowEntry(sw, flowEntry, MsgPriority.HIGH);
 	    pushTime = System.nanoTime() - startPush;
 	}
 
@@ -325,7 +325,7 @@
 	    fm.setPriority(statisticsReply.getPriority());
 	    fm.setOutPort(OFPort.OFPP_NONE);
 
-	    pusher.add(sw, fm);
+	    pusher.add(sw, fm, MsgPriority.HIGH);
 	}
 
 	/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index 6bf20d9..29a579e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -21,6 +21,18 @@
  *
  */
 public interface IFlowPusherService extends IFloodlightService {
+	public static enum MsgPriority {
+		HIGH,		// High priority: e.g. flow synchronization
+		NORMAL,		// Normal priority
+//		LOW,		// Low priority, not needed for now
+	}
+	
+    public static enum QueueState {
+		READY,		// Queues with all priority are at work
+		SUSPENDED,	// Only prior queue is at work
+		UNKNOWN
+	}
+
 	/**
 	 * Create a queue correspondent to the switch.
 	 * @param sw Switch to which new queue is attached.
@@ -49,7 +61,7 @@
 	boolean deleteQueue(IOFSwitch sw, boolean forceStop);
 	
 	/**
-	 * Add a message to the queue of the switch.
+	 * Add a message to the queue of the switch with normal priority.
 	 *
 	 * Note: Notification is NOT delivered for the pushed message.
 	 *
@@ -60,7 +72,18 @@
 	boolean add(IOFSwitch sw, OFMessage msg);
 
 	/**
-	 * Push a collection of Flow Entries to the corresponding switches.
+	 * Add a message to the queue of the switch with specific priority.
+	 *
+	 * @param sw Switch to which message is pushed.
+	 * @param msg Message object to be added.
+	 * @param priority Sending priority of the message.
+	 * @return true if message is successfully added to a queue.
+	 */
+	boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority);
+	
+	/**
+	 * Push a collection of Flow Entries to the corresponding switches
+	 * with normal priority.
 	 *
 	 * Note: Notification is delivered for the Flow Entries that
 	 * are pushed successfully.
@@ -71,8 +94,22 @@
 	void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries);
 
 	/**
+	 * Push a collection of Flow Entries to the corresponding switches
+	 * with specific priority.
+	 *
+	 * Note: Notification is delivered for the Flow Entries that
+	 * are pushed successfully.
+	 *
+	 * @param entries the collection of <IOFSwitch, FlowEntry> pairs
+	 * to push.
+	 * @param priority Sending priority of flow entries.
+	 */
+	void pushFlowEntries(Collection<Pair<IOFSwitch, FlowEntry>> entries,
+			MsgPriority priority);
+	
+	/**
 	 * Create a message from FlowEntry and add it to the queue of the
-	 * switch.
+	 * switch with normal priority.
 	 *
 	 * Note: Notification is delivered for the Flow Entries that
 	 * are pushed successfully.
@@ -84,6 +121,20 @@
 	void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry);
 
 	/**
+	 * Create a message from FlowEntry and add it to the queue of the
+	 * switch with specific priority.
+	 *
+	 * Note: Notification is delivered for the Flow Entries that
+	 * are pushed successfully.
+	 *
+	 * @param sw Switch to which message is pushed.
+	 * @param flowEntry FlowEntry object used for creating message.
+	 * @return true if message is successfully added to a queue.
+	 */
+	void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry,
+			MsgPriority priority);
+	
+	/**
 	 * Set sending rate to a switch.
 	 * @param sw Switch.
 	 * @param rate Rate in bytes/ms.
@@ -119,9 +170,9 @@
 	boolean resume(IOFSwitch sw);
 	
 	/**
-	 * Get whether pushing of message is suspended or not.
+	 * Get state of queue attached to a switch.
 	 * @param sw Switch to be checked.
-	 * @return true if suspended.
+	 * @return State of queue.
 	 */
-	boolean isSuspended(IOFSwitch sw);
+	QueueState getState(IOFSwitch sw);
 }
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizerTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizerTest.java
new file mode 100644
index 0000000..28f221d
--- /dev/null
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizerTest.java
@@ -0,0 +1,320 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.onrc.onos.graph.DBOperation;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
+import net.onrc.onos.ofcontroller.flowmanager.FlowDatabaseOperation;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService.MsgPriority;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowSyncService.SyncResult;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFStatisticsRequest;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.statistics.OFFlowStatisticsReply;
+import org.openflow.protocol.statistics.OFStatistics;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+// Test should be fixed to fit RAMCloud basis
+@Ignore
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({FlowSynchronizer.class, DBOperation.class, FlowDatabaseOperation.class})
+public class FlowSynchronizerTest {
+	private FlowPusher pusher;
+	private FlowSynchronizer sync;
+	private List<Long> idAdded;
+	private List<Long> idRemoved;
+
+	@Before
+	public void setUp() throws Exception {
+		idAdded = new ArrayList<Long>();
+		idRemoved = new ArrayList<Long>();
+		
+		pusher = EasyMock.createMock(FlowPusher.class);
+		EasyMock.expect(pusher.suspend(EasyMock.anyObject(IOFSwitch.class))).andReturn(true).anyTimes();
+		EasyMock.expect(pusher.resume(EasyMock.anyObject(IOFSwitch.class))).andReturn(true).anyTimes();
+		pusher.add(EasyMock.anyObject(IOFSwitch.class), EasyMock.anyObject(OFMessage.class),
+				EasyMock.eq(MsgPriority.HIGH));
+		EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+				@Override
+				public Object answer() throws Throwable {
+					OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
+					if (msg.getType().equals(OFType.FLOW_MOD)) {
+						OFFlowMod fm = (OFFlowMod)msg;
+						if (fm.getCommand() == OFFlowMod.OFPFC_DELETE_STRICT) {
+							idRemoved.add(fm.getCookie());
+						}
+					}
+					return null;
+				}
+			}).anyTimes();
+		pusher.pushFlowEntry(EasyMock.anyObject(IOFSwitch.class), EasyMock.anyObject(FlowEntry.class),
+				EasyMock.eq(MsgPriority.HIGH));
+		EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+			@Override
+			public Object answer() throws Throwable {
+				FlowEntry flow = (FlowEntry)EasyMock.getCurrentArguments()[1];
+				idAdded.add(flow.flowEntryId().value());
+				return null;
+			}
+		}).anyTimes();
+		EasyMock.replay(pusher);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+	/**
+	 * Test that synchronization doesn't affect anything in case either DB and
+	 * flow table has the same entries.
+	 */
+	@Test
+	public void testStable() {
+		// Create mock of flow table : flow 1
+		IOFSwitch sw = createMockSwitch(new long[] {1});
+		
+		// Create mock of flow entries : flow 1
+		initMockGraph(new long[] {1});
+		
+		// synchronize
+		doSynchronization(sw);
+		
+		// check if flow is not changed
+		assertEquals(0, idAdded.size());
+		assertEquals(0, idRemoved.size());
+	}
+
+	/**
+	 * Test that an flow is added in case DB has an extra FlowEntry.
+	 */
+	@Test
+	public void testSingleAdd() {
+		// Create mock of flow table : null
+		IOFSwitch sw = createMockSwitch(new long[] {});
+		
+		// Create mock of flow entries : flow 1
+		initMockGraph(new long[] {1});
+		
+		// synchronize
+		doSynchronization(sw);
+		
+		// check if single flow is installed
+		assertEquals(1, idAdded.size());
+		assertTrue(idAdded.contains((long)1));
+		assertEquals(0, idRemoved.size());
+	}
+
+	/**
+	 * Test that an flow is deleted in case switch has an extra FlowEntry.
+	 */
+	@Test
+	public void testSingleDelete() {
+		// Create mock of flow table : flow 1
+		IOFSwitch sw = createMockSwitch(new long[] {1});
+		
+		// Create mock of flow entries : null
+		initMockGraph(new long[] {});
+		
+		// synchronize
+		doSynchronization(sw);
+		
+		// check if single flow is deleted
+		assertEquals(0, idAdded.size());
+		assertEquals(1, idRemoved.size());
+		assertTrue(idRemoved.contains((long)1));
+	}
+	
+	/**
+	 * Test that appropriate flows are added and other appropriate flows are deleted
+	 * in case flows in DB are overlapping flows in switch.
+	 */
+	@Test
+	public void testMixed() {
+		// Create mock of flow table : flow 1,2,3
+		IOFSwitch sw = createMockSwitch(new long[] {1,2,3});
+		
+		// Create mock of flow entries : flow 2,3,4,5
+		initMockGraph(new long[] {2,3,4,5});
+		
+		// synchronize
+		doSynchronization(sw);
+		
+		// check if two flows {4,5} is installed and one flow {1} is deleted
+		assertEquals(2, idAdded.size());
+		assertTrue(idAdded.contains((long)4));
+		assertTrue(idAdded.contains((long)5));
+		assertEquals(1, idRemoved.size());
+		assertTrue(idRemoved.contains((long)1));
+	}
+	
+
+	@Test
+	public void testMassive() {
+		// Create mock of flow table : flow 0-1999
+		long [] swIdList = new long [2000];
+		for (long i = 0; i < 2000; ++i) {
+			swIdList[(int)i] = i;
+		}
+		IOFSwitch sw = createMockSwitch(swIdList);
+		
+		// Create mock of flow entries : flow 1500-3499
+		long [] dbIdList = new long [2000];
+		for (long i = 0; i < 2000; ++i) {
+			dbIdList[(int)i] = 1500 + i;
+		}
+		initMockGraph(dbIdList);
+
+		// synchronize
+		doSynchronization(sw);
+		
+		// check if 1500 flows {2000-3499} is installed and 1500 flows {0,...,1499} is deleted
+		assertEquals(1500, idAdded.size());
+		for (long i = 2000; i < 3500; ++i) {
+			assertTrue(idAdded.contains(i));
+		}
+		assertEquals(1500, idRemoved.size());
+		for (long i = 0; i < 1500; ++i) {
+			assertTrue(idRemoved.contains(i));
+		}
+	}
+
+	/**
+	 * Create mock IOFSwitch with flow table which has arbitrary flows.
+	 * @param cookieList List of FlowEntry IDs switch has.
+	 * @return Mock object.
+	 */
+	private IOFSwitch createMockSwitch(long[] cookieList) {
+		IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
+		EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+		
+		List<OFStatistics> stats = new ArrayList<OFStatistics>();
+		for (long cookie : cookieList) {
+			stats.add(createReply(cookie));
+		}
+		
+		@SuppressWarnings("unchecked")
+		Future<List<OFStatistics>> future = EasyMock.createMock(Future.class);
+		try {
+			EasyMock.expect(future.get()).andReturn(stats).once();
+		} catch (InterruptedException e1) {
+			fail("Failed in Future#get()");
+		} catch (ExecutionException e1) {
+			fail("Failed in Future#get()");
+		}
+		EasyMock.replay(future);
+		
+		try {
+			EasyMock.expect(sw.getStatistics(EasyMock.anyObject(OFStatisticsRequest.class)))
+				.andReturn(future).once();
+		} catch (IOException e) {
+			fail("Failed in IOFSwitch#getStatistics()");
+		}
+		
+		EasyMock.replay(sw);
+		return sw;
+	}
+	
+	/**
+	 * Create single OFFlowStatisticsReply object which is actually obtained from switch.
+	 * @param cookie Cookie value, which indicates ID of FlowEntry installed to switch.
+	 * @return Created object.
+	 */
+	private OFFlowStatisticsReply createReply(long cookie) {
+		OFFlowStatisticsReply stat = new OFFlowStatisticsReply();
+		OFMatch match = new OFMatch();
+		
+		stat.setCookie(cookie);
+		stat.setMatch(match);
+		stat.setPriority((short)1);
+
+		return stat;
+	}
+	
+	/**
+	 * Create mock GraphDBOperation and FlowDatabaseOperation to mock DB.
+	 * @param idList List of FlowEntry IDs stored in DB.
+	 */
+	private void initMockGraph(long[] idList) {
+		List<IFlowEntry> flowEntryList = new ArrayList<IFlowEntry>();
+		
+		for (long id : idList) {
+			IFlowEntry entry = EasyMock.createMock(IFlowEntry.class);
+			EasyMock.expect(entry.getFlowEntryId()).andReturn(String.valueOf(id)).anyTimes();
+			EasyMock.replay(entry);
+			flowEntryList.add(entry);
+		}
+		
+		ISwitchObject swObj = EasyMock.createMock(ISwitchObject.class);
+		EasyMock.expect(swObj.getFlowEntries()).andReturn(flowEntryList).once();
+		EasyMock.replay(swObj);
+		
+		DBOperation mockOp = PowerMock.createMock(DBOperation.class);
+		EasyMock.expect(mockOp.searchSwitch(EasyMock.anyObject(String.class))).andReturn(swObj).once();
+		
+		PowerMock.mockStatic(FlowDatabaseOperation.class);
+		for (IFlowEntry entry : flowEntryList) {
+			EasyMock.expect(FlowDatabaseOperation.extractFlowEntry(EasyMock.eq(entry)))
+				.andAnswer(new IAnswer<FlowEntry>() {
+					@Override
+					public FlowEntry answer() throws Throwable {
+						IFlowEntry iflow = (IFlowEntry)EasyMock.getCurrentArguments()[0];
+						long flowEntryId = Long.valueOf(iflow.getFlowEntryId());
+						
+						FlowEntry flow = EasyMock.createMock(FlowEntry.class);
+						EasyMock.expect(flow.flowEntryId()).andReturn(new FlowEntryId(flowEntryId)).anyTimes();
+						EasyMock.replay(flow);
+						return flow;
+					}
+					
+				}).anyTimes();
+			EasyMock.expect(mockOp.searchFlowEntry(EasyMock.eq(new FlowEntryId(entry.getFlowEntryId()))))
+				.andReturn(entry);
+		}
+		PowerMock.replay(FlowDatabaseOperation.class);
+		EasyMock.replay(mockOp);
+		
+		try {
+			PowerMock.expectNew(DBOperation.class).andReturn(mockOp);
+		} catch (Exception e) {
+			fail("Failed to create DBOperation");
+		}
+		PowerMock.replay(DBOperation.class);
+	}
+	
+	/**
+	 * Instantiate FlowSynchronizer and sync flows.
+	 * @param sw Target IOFSwitch object
+	 */
+	private void doSynchronization(IOFSwitch sw) {
+		sync = new FlowSynchronizer();
+		sync.init(pusher);
+		Future<SyncResult> future = sync.synchronize(sw);
+		try {
+			future.get();
+		} catch (Exception e) {
+			fail("Failed to Future#get()");
+		}
+	}
+}