Cherry-pick from https://gerrit.onos.onlab.us/#/c/92/4 (partially).

NOTE: Abandoned the implementation about automatic-barrier function.

FlowEntryAdded event is changed to be occurred after reception of BARRIER_REPLY (ONOS-915).
Fixed a bug that the same FlowEntry is installed to switch multiple times.

Change-Id: Ibb5cdaf9a478e2697232203d9190dedb4792f108

Fixed a bug that FlowEntries are shallow-copied in FlowPusher.

Change-Id: I704d245d3660a91f5e60fd6cdf05f89b1fba6c25

Fixed a problem that FlowPusherTest fails often(ONOS-984).

Change-Id: I2a290d5dc756e43a8058c6f35372f2e2932a8d73
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 21be62c..d2af973 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -11,6 +11,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 
@@ -29,7 +30,6 @@
 import net.floodlightcontroller.threadpool.IThreadPoolService;
 import net.floodlightcontroller.util.MACAddress;
 import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
 import net.onrc.onos.ofcontroller.util.FlowEntryAction;
 import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
@@ -54,7 +54,6 @@
  */
 public class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
-    protected volatile IFlowService flowManager;
 
     // NOTE: Below are moved from FlowManager.
     // TODO: Values copied from elsewhere (class LearningSwitch).
@@ -70,6 +69,18 @@
 		READY,
 		SUSPENDED,
 	}
+
+	private static class SwitchQueueEntry {
+		OFMessage msg;
+		
+		public SwitchQueueEntry(OFMessage msg) {
+			this.msg = msg;
+		}
+		
+		public OFMessage getOFMessage() {
+			return msg;
+		}
+	}
 	
 	/**
 	 * SwitchQueue represents message queue attached to a switch.
@@ -77,7 +88,7 @@
 	 * @author Naoki Shiota
 	 *
 	 */
-	private class SwitchQueue extends ArrayDeque<OFMessage> {
+	private class SwitchQueue extends ArrayDeque<SwitchQueueEntry> {
 		private static final long serialVersionUID = 1L;
 
 		QueueState state;
@@ -119,6 +130,52 @@
 			last_sent_time = current;
 			last_sent_size = size;
 		}
+	}
+	
+	/**
+	 * BarrierInfo holds information to specify barrier message sent to switch.
+	 * @author Naoki
+	 */
+	private static class BarrierInfo {
+		final long dpid;
+		final int xid;
+		
+		static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
+			return new BarrierInfo(sw.getId(), req.getXid());
+		}
+		
+		static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
+			return new BarrierInfo(sw.getId(), rpy.getXid());
+		}
+		
+		private BarrierInfo(long dpid, int xid) {
+			this.dpid = dpid;
+			this.xid = xid;
+		}
+
+		// Auto generated code by Eclipse
+		@Override
+		public int hashCode() {
+			final int prime = 31;
+			int result = 1;
+			result = prime * result + (int) (dpid ^ (dpid >>> 32));
+			result = prime * result + xid;
+			return result;
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (this == obj)
+				return true;
+			if (obj == null)
+				return false;
+			if (getClass() != obj.getClass())
+				return false;
+			
+			BarrierInfo other = (BarrierInfo) obj;
+			return (this.dpid == other.dpid) && (this.xid == other.xid);
+		}
+		
 		
 	}
 	
@@ -130,9 +187,9 @@
 	
 	// Map of threads versus dpid
 	private Map<Long, FlowPusherThread> threadMap = null;
-	// Map of Future objects versus dpid and transaction ID.
-	private Map<Long, Map<Integer, OFBarrierReplyFuture>>
-		barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
+	// Map from (DPID and transaction ID) to Future objects.
+	private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
+		= new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
 	
 	private int number_thread = 1;
 	
@@ -143,7 +200,7 @@
 	 */
 	private class FlowPusherThread extends Thread {
 		private Map<IOFSwitch,SwitchQueue> queues
-			= new HashMap<IOFSwitch,SwitchQueue>();
+			= new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
 		
 		// reusable latch used for waiting for arrival of message
 		private Semaphore mutex = new Semaphore(0);
@@ -163,10 +220,8 @@
 				
 				// for safety of concurrent access, copy all key objects
 				Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
-				synchronized (queues) {
-					for (IOFSwitch sw : queues.keySet()) {
-						keys.add(sw);
-					}
+				for (IOFSwitch sw : queues.keySet()) {
+					keys.add(sw);
 				}
 				
 				for (IOFSwitch sw : keys) {
@@ -183,9 +238,7 @@
 						if (queue.isEmpty()) {
 							// remove queue if flagged to be.
 							if (queue.toBeDeleted) {
-								synchronized (queues) {
-									queues.remove(sw);
-								}
+								queues.remove(sw);
 							}
 						} else {
 							// if some messages remains in queue, latch down
@@ -206,7 +259,7 @@
 		 * @param max_msg Limitation of number of messages to be sent. If set to 0,
 		 *                all messages in queue will be sent.
 		 */
-		private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
+		private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
 			// check sending rate and determine it to be sent or not
 			long current_time = System.currentTimeMillis();
 			long size = 0;
@@ -220,16 +273,24 @@
 					}
 					++i;
 					
-					OFMessage msg = queue.poll();
+					SwitchQueueEntry queueEntry;
+					synchronized (queue) {
+						queueEntry = queue.poll();
+					}
+					
+					OFMessage msg = queueEntry.getOFMessage();
 					try {
 						messageDamper.write(sw, msg, context);
-//						log.debug("Pusher sends message : {}", msg);
+						if (log.isTraceEnabled()) {
+							log.trace("Pusher sends message : {}", msg);
+						}
 						size += msg.getLength();
 					} catch (IOException e) {
 						e.printStackTrace();
 						log.error("Exception in sending message ({}) : {}", msg, e);
 					}
 				}
+				
 				sw.flush();
 				queue.logSentData(current_time, size);
 			}
@@ -269,7 +330,6 @@
 		this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
 		IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
 		flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
-		flowManager = modContext.getServiceImpl(IFlowService.class);
 		
 		if (damper != null) {
 			messageDamper = damper;
@@ -374,7 +434,9 @@
 		
 		if (rate > 0) {
 			log.debug("rate for {} is set to {}", sw.getId(), rate);
-			queue.max_rate = rate;
+			synchronized (queue) {
+				queue.max_rate = rate;
+			}
 		}
 	}
 
@@ -388,9 +450,7 @@
 		FlowPusherThread proc = getProcess(sw);
 		queue = new SwitchQueue();
 		queue.state = QueueState.READY;
-		synchronized (proc.queues) {
-			proc.queues.put(sw, queue);
-		}
+		proc.queues.put(sw, queue);
 		
 		return true;
 	}
@@ -405,11 +465,9 @@
 		FlowPusherThread proc = getProcess(sw);
 		
 		if (forceStop) {
-			synchronized (proc.queues) {
-				SwitchQueue queue = proc.queues.remove(sw);
-				if (queue == null) {
-					return false;
-				}
+			SwitchQueue queue = proc.queues.remove(sw);
+			if (queue == null) {
+				return false;
 			}
 			return true;
 		} else {
@@ -426,48 +484,16 @@
 	
 	@Override
 	public boolean add(IOFSwitch sw, OFMessage msg) {
-		FlowPusherThread proc = getProcess(sw);
-		SwitchQueue queue = proc.queues.get(sw);
-
-		// create queue at first addition of message
-		if (queue == null) {
-			createQueue(sw);
-			queue = getQueue(sw);
-		}
-		
-		synchronized (queue) {
-			queue.add(msg);
-//			log.debug("Message is pushed : {}", msg);
-		}
-		
-		if (proc.mutex.availablePermits() == 0) {
-			proc.mutex.release();
-		}
-
-		return true;
+		return addMessageImpl(sw, msg);
 	}
-
+	
 	@Override
 	public void pushFlowEntries(
 		Collection<Pair<IOFSwitch, FlowEntry>> entries) {
 
-		List<Pair<IOFSwitch, FlowEntry>> pushedEntries =
-			new LinkedList<Pair<IOFSwitch, FlowEntry>>();
-
 		for (Pair<IOFSwitch, FlowEntry> entry : entries) {
-			if (add(entry.first, entry.second)) {
-				pushedEntries.add(entry);
-			}
+			add(entry.first, entry.second);
 		}
-
-		//
-		// TODO: We should use the OpenFlow Barrier mechanism
-		// to check for errors, and update the SwitchState
-		// for a flow entry after the Barrier message is
-		// is received.
-		// Only after inform the Flow Manager that the entry is pushed.
-		//
-		flowManager.flowEntriesPushedToSwitch(pushedEntries);
 	}
 
 	@Override
@@ -755,7 +781,39 @@
 			, actionOutputPort
 			);
 
-		return add(sw, fm);
+		return addMessageImpl(sw, fm);
+	}
+	
+	/**
+	 * Add message to queue
+	 * @param sw
+	 * @param msg
+	 * @param flowEntryId
+	 * @return
+	 */
+	protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg) {
+		FlowPusherThread proc = getProcess(sw);
+		SwitchQueue queue = proc.queues.get(sw);
+
+		// create queue at first addition of message
+		if (queue == null) {
+			createQueue(sw);
+			queue = getQueue(sw);
+		}
+		
+		SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+		synchronized (queue) {
+			queue.add(entry);
+			if (log.isTraceEnabled()) {
+				log.trace("Message is pushed : {}", entry.getOFMessage());
+			}
+		}
+		
+		if (proc.mutex.availablePermits() == 0) {
+			proc.mutex.release();
+		}
+
+		return true;
 	}
 	
 	@Override
@@ -786,24 +844,23 @@
 			return null;
 		}
 		
-		OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
-		msg.setXid(sw.getNextTransactionId());
-
-		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
-		synchronized (barrierFutures) {
-			Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
-			if (map == null) {
-				map = new HashMap<Integer,OFBarrierReplyFuture>();
-				barrierFutures.put(sw.getId(), map);
-			}
-			map.put(msg.getXid(), future);
-		}
+		OFBarrierRequest msg = createBarrierRequest(sw);
 		
-		add(sw, msg);
+		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+		barrierFutures.put(BarrierInfo.create(sw,msg), future);
+		
+		addMessageImpl(sw, msg);
 		
 		return future;
 	}
 
+	protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
+		OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+		msg.setXid(sw.getNextTransactionId());
+		
+		return msg;
+	}
+	
 	/**
 	 * Get a queue attached to a switch.
 	 * @param sw Switch object
@@ -838,7 +895,7 @@
 		
 		return threadMap.get(hash);
 	}
-
+	
 	@Override
 	public String getName() {
 		return "flowpusher";
@@ -856,22 +913,25 @@
 
 	@Override
 	public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
-		Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
-		if (map == null) {
-			log.debug("null map for {} : {}", sw.getId(), barrierFutures);
+		if (log.isTraceEnabled()) {
+			log.trace("Received BARRIER_REPLY from : {}", sw.getId());
+		}
+
+		if (msg.getType() != OFType.BARRIER_REPLY) {
+			log.error("Unexpected reply message : {}", msg.getType());
 			return Command.CONTINUE;
 		}
 		
-		OFBarrierReplyFuture future = map.get(msg.getXid());
-		if (future == null) {
-			log.debug("null future for {} : {}", msg.getXid(), map);
-			return Command.CONTINUE;
-		}
+		OFBarrierReply reply = (OFBarrierReply) msg;
+		BarrierInfo info = BarrierInfo.create(sw,reply);
 		
-		log.debug("Received BARRIER_REPLY : {}", msg);
-		future.deliverFuture(sw, msg);
+		// Deliver future if exists
+		OFBarrierReplyFuture future = barrierFutures.get(info);
+		if (future != null) {
+			future.deliverFuture(sw, msg);
+			barrierFutures.remove(info);
+		}
 		
 		return Command.CONTINUE;
 	}
-
 }
diff --git a/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
index 4779b75..ae61707 100644
--- a/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
+++ b/src/test/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusherTest.java
@@ -4,7 +4,6 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -17,7 +16,6 @@
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.threadpool.IThreadPoolService;
 import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
 import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryActions;
@@ -31,6 +29,7 @@
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Test;
+import org.openflow.protocol.OFBarrierReply;
 import org.openflow.protocol.OFBarrierRequest;
 import org.openflow.protocol.OFFlowMod;
 import org.openflow.protocol.OFMatch;
@@ -47,7 +46,6 @@
 	private OFMessageDamper damper;
 	private IFloodlightProviderService flProviderService;
 	private IThreadPoolService threadPoolService;
-	private IFlowService flowService;
 
 	/**
 	 * Test single OFMessage is correctly sent to single switch via MessageDamper.
@@ -86,9 +84,9 @@
 		} catch (InterruptedException e) {
 			fail("Failed in Thread.sleep()");
 		}
-		
 		EasyMock.verify(msg);
 		EasyMock.verify(sw);
+		verifyAll();
 		
 		pusher.stop();
 	}
@@ -135,7 +133,7 @@
 		
 		try {
 			// wait until message is processed.
-			Thread.sleep(1000);
+			Thread.sleep(5000);
 		} catch (InterruptedException e) {
 			fail("Failed in Thread.sleep()");
 		}
@@ -144,6 +142,7 @@
 			EasyMock.verify(msg);
 		}
 		EasyMock.verify(sw);
+		verifyAll();
 		
 		pusher.stop();
 	}
@@ -209,7 +208,8 @@
 			
 			EasyMock.verify(sw);
 		}
-		
+		verifyAll();
+
 		pusher.stop();
 	}
 	
@@ -274,7 +274,8 @@
 			
 			EasyMock.verify(sw);
 		}
-		
+		verifyAll();
+
 		pusher.stop();
 	}
 	
@@ -365,7 +366,8 @@
 			EasyMock.verify(msg);
 		}
 		EasyMock.verify(sw);
-		
+		verifyAll();
+
 		pusher.stop();
 	}
 
@@ -396,9 +398,20 @@
 		OFBarrierReplyFuture future = pusher.barrierAsync(sw);
 		
 		assertNotNull(future);
+		
+		try {
+			Thread.sleep(1000);
+		} catch (InterruptedException e) {
+			fail("Failed to sleep");
+		}
+		
+		verifyAll();
+
 		pusher.stop();
 	}
 	
+	static final int XID_TO_VERIFY = 100;
+	static final long DPID_TO_VERIFY = 10;
 	/**
 	 * Test FlowObject is correctly converted to message and is sent to a switch.
 	 */
@@ -409,7 +422,7 @@
 		
 		// instantiate required objects
 		FlowEntry flowEntry1 = new FlowEntry();
-		flowEntry1.setDpid(new Dpid(1));
+		flowEntry1.setDpid(new Dpid(DPID_TO_VERIFY));
 		flowEntry1.setFlowId(new FlowId(1));
 		flowEntry1.setInPort(new Port((short) 1));
 		flowEntry1.setOutPort(new Port((short) 11));
@@ -432,25 +445,18 @@
 		EasyMock.expect(msg.setActions((List<OFAction>)EasyMock.anyObject())).andReturn(msg);
 		EasyMock.expect(msg.setLengthU(EasyMock.anyShort())).andReturn(msg);
 		EasyMock.expect(msg.setOutPort(EasyMock.anyShort())).andReturn(msg).atLeastOnce();
-		EasyMock.expect(msg.getXid()).andReturn(1).anyTimes();
+		EasyMock.expect(msg.getXid()).andReturn(XID_TO_VERIFY).anyTimes();
 		EasyMock.expect(msg.getType()).andReturn(OFType.FLOW_MOD).anyTimes();
 		EasyMock.expect(msg.getLength()).andReturn((short)100).anyTimes();
 		EasyMock.replay(msg);
 		
 		EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.FLOW_MOD))).andReturn(msg);
-		
-		ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
-		EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
-				(TimeUnit)EasyMock.anyObject())).andReturn(null).once();
-		EasyMock.replay(executor);
-		EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor);
 
 		IOFSwitch sw = EasyMock.createMock(IOFSwitch.class);
-		EasyMock.expect(sw.getId()).andReturn((long)1).anyTimes();
+		EasyMock.expect(sw.getId()).andReturn(DPID_TO_VERIFY).anyTimes();
 		EasyMock.expect(sw.getStringId()).andReturn("1").anyTimes();
 		sw.flush();
 		EasyMock.expectLastCall().once();
-		EasyMock.replay(sw);
 		
 		try {
 			EasyMock.expect(damper.write(EasyMock.eq(sw), EasyMock.anyObject(OFMessage.class), EasyMock.eq(context)))
@@ -458,14 +464,18 @@
 					@Override
 					public Boolean answer() throws Throwable {
 						OFMessage msg = (OFMessage)EasyMock.getCurrentArguments()[1];
-						assertEquals(msg.getType(), OFType.FLOW_MOD);
+						if (msg.getType() == OFType.FLOW_MOD) {
+							assertEquals(msg.getXid(), XID_TO_VERIFY);
+						}
 						return true;
 					}
-				}).once();
+				}).atLeastOnce();
 		} catch (IOException e1) {
 			fail("Failed in OFMessageDamper#write()");
 		}
 		
+		EasyMock.replay(sw);
+		
 		endInitMock();
 		initPusher(1);
 
@@ -478,11 +488,11 @@
 		}
 		
 		EasyMock.verify(sw);
-		
+		verifyAll();
+
 		pusher.stop();
 	}
 	
-	@SuppressWarnings("unchecked")
 	private void beginInitMock() {
 		context = EasyMock.createMock(FloodlightContext.class);
 		modContext = EasyMock.createMock(FloodlightModuleContext.class);
@@ -490,24 +500,23 @@
 		damper = EasyMock.createMock(OFMessageDamper.class);
 		flProviderService = EasyMock.createMock(IFloodlightProviderService.class);
 		threadPoolService = EasyMock.createMock(IThreadPoolService.class);
-		flowService = EasyMock.createMock(IFlowService.class);
-		
-		flowService.flowEntriesPushedToSwitch(EasyMock.anyObject(Collection.class));
-		EasyMock.expectLastCall().anyTimes();
 		
 		EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IThreadPoolService.class)))
 			.andReturn(threadPoolService).once();
 		EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IFloodlightProviderService.class)))
 			.andReturn(flProviderService).once();
-		EasyMock.expect(modContext.getServiceImpl(EasyMock.eq(IFlowService.class)))
-			.andReturn(flowService).once();
 		flProviderService.addOFMessageListener(EasyMock.eq(OFType.BARRIER_REPLY),
 				(FlowPusher) EasyMock.anyObject());
 		EasyMock.expectLastCall().once();
+		
+		ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
+		EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
+				(TimeUnit)EasyMock.anyObject())).andReturn(null).once();
+		EasyMock.replay(executor);
+		EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor).anyTimes();
 	}
 	
 	private void endInitMock() {
-		EasyMock.replay(flowService);
 		EasyMock.replay(threadPoolService);
 		EasyMock.replay(flProviderService);
 		EasyMock.replay(damper);
@@ -516,6 +525,15 @@
 		EasyMock.replay(context);
 	}
 	
+	private void verifyAll() {
+		EasyMock.verify(threadPoolService);
+		EasyMock.verify(flProviderService);
+		EasyMock.verify(damper);
+		EasyMock.verify(factory);
+		EasyMock.verify(modContext);
+		EasyMock.verify(context);
+	}
+		
 	private void initPusher(int num_thread) {
 		pusher = new FlowPusher(num_thread);
 		pusher.init(context, modContext, factory, damper);
@@ -530,14 +548,7 @@
 		EasyMock.expect(req.getType()).andReturn(OFType.BARRIER_REQUEST).anyTimes();
 		EasyMock.expect(req.getLength()).andReturn((short)100).anyTimes();
 		EasyMock.replay(req);
-		EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.BARRIER_REQUEST))).andReturn(req);
-		
-		ScheduledExecutorService executor = EasyMock.createMock(ScheduledExecutorService.class);
-		EasyMock.expect(executor.schedule((Runnable)EasyMock.anyObject(), EasyMock.anyLong(),
-				(TimeUnit)EasyMock.anyObject())).andReturn(null).once();
-		EasyMock.replay(executor);
-		EasyMock.expect(threadPoolService.getScheduledExecutor()).andReturn(executor);
-
+		EasyMock.expect(factory.getMessage(EasyMock.eq(OFType.BARRIER_REQUEST))).andReturn(req).anyTimes();
 		EasyMock.expect(sw.getNextTransactionId()).andReturn(1);
 	}