Merge pull request #457 from n-shiota/syncdev17

Added barrier handling feature to FlowPusher
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index a59a9f9..b567a87 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -4,6 +4,10 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -11,6 +15,9 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 
 public class FlowProgrammer implements IFloodlightModule {
+    @SuppressWarnings("unused")
+	private final static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
+
 	private static final boolean enableFlowSync = false;
 	
     protected volatile IFloodlightProviderService floodlightProvider;
@@ -31,7 +38,7 @@
     public void init(FloodlightModuleContext context)
 	    throws FloodlightModuleException {
 	floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
-	pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
+	pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
 	if (enableFlowSync) {
 	synchronizer.init(context);
 	}
@@ -77,5 +84,4 @@
 	return l;
     }
     
-
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 532477a..f43a83e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -8,6 +8,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
 
 import org.openflow.protocol.*;
 import org.openflow.protocol.action.*;
@@ -16,7 +18,12 @@
 import org.slf4j.LoggerFactory;
 
 import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
 import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
 import net.floodlightcontroller.util.MACAddress;
 import net.floodlightcontroller.util.OFMessageDamper;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
@@ -38,9 +45,11 @@
  * @author Naoki Shiota
  *
  */
-public class FlowPusher implements IFlowPusherService {
+public class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
 
+    private static boolean barrierIfEmpty = false;
+    
     // NOTE: Below are moved from FlowManager.
     // TODO: Values copied from elsewhere (class LearningSwitch).
     // The local copy should go away!
@@ -64,6 +73,12 @@
 		SUSPENDED,
 	}
 	
+	/**
+	 * Message queue attached to a switch.
+	 * This consists of queue itself and variables used for limiting sending rate.
+	 * @author Naoki Shiota
+	 *
+	 */
 	@SuppressWarnings("serial")
 	private class SwitchQueue extends ArrayDeque<OFMessage> {
 		QueueState state;
@@ -84,15 +99,20 @@
 				return true;
 			}
 			
-			long rate = last_sent_size / (current - last_sent_time);
-			
-			if (rate < max_rate) {
-				return true;
-			} else {
+			if (current == last_sent_time) {
 				return false;
 			}
+			
+			// Check if sufficient time (from aspect of rate) elapsed or not.
+			long rate = last_sent_size / (current - last_sent_time);
+			return (rate < max_rate);
 		}
 		
+		/**
+		 * Log time and size of last sent data.
+		 * @param current Time to be sent.
+		 * @param size Size of sent data (in bytes).
+		 */
 		void logSentData(long current, long size) {
 			last_sent_time = current;
 			last_sent_size = size;
@@ -100,11 +120,14 @@
 		
 	}
 	
-	private OFMessageDamper messageDamper;
+	private OFMessageDamper messageDamper = null;
+	private IThreadPoolService threadPool = null;
 
 	private FloodlightContext context = null;
 	private BasicFactory factory = null;
-	private Map<Long, FlowPusherProcess> threadMap = null;
+	private Map<Long, FlowPusherThread> threadMap = null;
+	private Map<Long, Map<Integer, OFBarrierReplyFuture>>
+		barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
 	
 	private int number_thread = 1;
 	
@@ -113,26 +136,31 @@
 	 * @author Naoki Shiota
 	 *
 	 */
-	private class FlowPusherProcess implements Runnable {
+	private class FlowPusherThread extends Thread {
 		private Map<IOFSwitch,SwitchQueue> queues
 		= new HashMap<IOFSwitch,SwitchQueue>();
 		
-		private boolean isStopped = false;
-		private boolean isMsgAdded = false;
+		private Semaphore mutex = new Semaphore(0);
 		
 		@Override
 		public void run() {
 			log.debug("Begin Flow Pusher Process");
 			
 			while (true) {
+				try {
+					// wait for message pushed to queue
+					mutex.acquire();
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+					log.debug("FlowPusherThread is interrupted");
+					return;
+				}
+				
 				Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
 				synchronized (queues) {
 					entries = queues.entrySet();
 				}
 				
-				// Set taint flag to false at this moment.
-				isMsgAdded = false;
-				
 				for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
 					IOFSwitch sw = entry.getKey();
 					SwitchQueue queue = entry.getValue();
@@ -152,15 +180,14 @@
 							int i = 0;
 							while (! queue.isEmpty()) {
 								// Number of messages excess the limit
-								if (++i >= MAX_MESSAGE_SEND) {
+								if (i >= MAX_MESSAGE_SEND) {
 									// Messages remains in queue
-									isMsgAdded = true;
+									mutex.release();
 									break;
 								}
+								++i;
 								
 								OFMessage msg = queue.poll();
-								
-								// if need to send, call IOFSwitch#write()
 								try {
 									messageDamper.write(sw, msg, context);
 									log.debug("Pusher sends message : {}", msg);
@@ -172,42 +199,50 @@
 							}
 							sw.flush();
 							queue.logSentData(current_time, size);
+							
+							if (queue.isEmpty() && barrierIfEmpty) {
+								barrier(sw);
+							}
 						}
 					}
 				}
-				
-				// sleep while all queues are empty
-				while (! (isMsgAdded || isStopped)) {
-					try {
-						Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
-					} catch (InterruptedException e) {
-						e.printStackTrace();
-						log.error("Thread.sleep failed");
-					}
-				}
-				
-				log.debug("Exit sleep loop.");
-				
-				if (isStopped) {
-					log.debug("Pusher Process finished.");
-					return;
-				}
-
 			}
 		}
 	}
 	
+	/**
+	 * Initialize object with one thread.
+	 */
 	public FlowPusher() {
-		
 	}
 	
+	/**
+	 * Initialize object with threads of given number.
+	 * @param number_thread Number of threads to handle messages.
+	 */
 	public FlowPusher(int number_thread) {
 		this.number_thread = number_thread;
 	}
 	
-	public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
+	/**
+	 * Set parameters needed for sending messages.
+	 * @param context FloodlightContext used for sending messages.
+	 *        If null, FlowPusher uses default context.
+	 * @param modContext FloodlightModuleContext used for acquiring
+	 *        ThreadPoolService and registering MessageListener.
+	 * @param factory Factory object to create OFMessage objects.
+	 * @param damper Message damper used for sending messages.
+	 *        If null, FlowPusher creates its own damper object.
+	 */
+	public void init(FloodlightContext context,
+			FloodlightModuleContext modContext,
+			BasicFactory factory,
+			OFMessageDamper damper) {
 		this.context = context;
 		this.factory = factory;
+		this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+		IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
+		flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
 		
 		if (damper != null) {
 			messageDamper = damper;
@@ -228,12 +263,11 @@
 			return;
 		}
 		
-		threadMap = new HashMap<Long,FlowPusherProcess>();
+		threadMap = new HashMap<Long,FlowPusherThread>();
 		for (long i = 0; i < number_thread; ++i) {
-			FlowPusherProcess runnable = new FlowPusherProcess();
-			threadMap.put(i, runnable);
+			FlowPusherThread thread = new FlowPusherThread();
 			
-			Thread thread = new Thread(runnable);
+			threadMap.put(i, thread);
 			thread.start();
 		}
 	}
@@ -302,10 +336,8 @@
 			return;
 		}
 		
-		for (FlowPusherProcess runnable : threadMap.values()) {
-			if (! runnable.isStopped) {
-				runnable.isStopped = true;
-			}
+		for (FlowPusherThread t : threadMap.values()) {
+			t.interrupt();
 		}
 	}
 	
@@ -326,14 +358,14 @@
 	}
 	
 	/**
-	 * Add OFMessage to the queue related to given switch.
+	 * Add OFMessage to queue of the switch.
 	 * @param sw Switch to which message is sent.
 	 * @param msg Message to be sent.
 	 * @return true if succeed.
 	 */
 	@Override
 	public boolean add(IOFSwitch sw, OFMessage msg) {
-		FlowPusherProcess proc = getProcess(sw);
+		FlowPusherThread proc = getProcess(sw);
 		SwitchQueue queue = proc.queues.get(sw);
 
 		if (queue == null) {
@@ -349,8 +381,10 @@
 			log.debug("Message is pushed : {}", msg);
 		}
 		
-		proc.isMsgAdded = true;
-		
+		if (proc.mutex.availablePermits() == 0) {
+			proc.mutex.release();
+		}
+
 		return true;
 	}
 	
@@ -984,8 +1018,56 @@
 		
 		return add(sw,fm);
 	}
+	
+	@Override
+	public OFBarrierReply barrier(IOFSwitch sw) {
+		OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+		if (future == null) {
+			return null;
+		}
+		
+		try {
+			return future.get();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+			log.error("InterruptedException: {}", e);
+			return null;
+		} catch (ExecutionException e) {
+			e.printStackTrace();
+			log.error("ExecutionException: {}", e);
+			return null;
+		}
+	}
 
-	private SwitchQueue getQueue(IOFSwitch sw) {
+	@Override
+	public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+		// TODO creation of message and future should be moved to OFSwitchImpl
+
+		if (sw == null) {
+			return null;
+		}
+		
+		OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+		msg.setXid(sw.getNextTransactionId());
+		add(sw, msg);
+
+		// TODO create Future object of message
+		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+
+		synchronized (barrierFutures) {
+			Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+			if (map == null) {
+				map = new HashMap<Integer,OFBarrierReplyFuture>();
+				barrierFutures.put(sw.getId(), map);
+			}
+			map.put(msg.getXid(), future);
+			log.debug("Inserted future for {}", msg.getXid());
+		}
+		
+		return future;
+	}
+
+	protected SwitchQueue getQueue(IOFSwitch sw) {
 		if (sw == null)  {
 			return null;
 		}
@@ -993,14 +1075,48 @@
 		return getProcess(sw).queues.get(sw);
 	}
 	
-	private long getHash(IOFSwitch sw) {
-		// TODO should consider equalization algorithm
+	protected long getHash(IOFSwitch sw) {
+		// This code assumes DPID is sequentially assigned.
+		// TODO consider equalization algorithm
 		return sw.getId() % number_thread;
 	}
 	
-	private FlowPusherProcess getProcess(IOFSwitch sw) {
+	protected FlowPusherThread getProcess(IOFSwitch sw) {
 		long hash = getHash(sw);
 		
 		return threadMap.get(hash);
 	}
+
+	@Override
+	public String getName() {
+		return "flowpusher";
+	}
+
+	@Override
+	public boolean isCallbackOrderingPrereq(OFType type, String name) {
+		return false;
+	}
+
+	@Override
+	public boolean isCallbackOrderingPostreq(OFType type, String name) {
+		return false;
+	}
+
+	@Override
+	public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+		Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+		if (map == null) {
+			return Command.CONTINUE;
+		}
+		
+		OFBarrierReplyFuture future = map.get(msg.getXid());
+		if (future == null) {
+			return Command.CONTINUE;
+		}
+		
+		log.debug("Received BARRIER_REPLY : {}", msg);
+		future.deliverFuture(sw, msg);
+		
+		return Command.CONTINUE;
+	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index b2e4552..f3f5c50 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -2,7 +2,6 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -13,23 +12,8 @@
 
 import org.openflow.protocol.OFFlowMod;
 import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPacketOut;
 import org.openflow.protocol.OFPort;
 import org.openflow.protocol.OFStatisticsRequest;
-import org.openflow.protocol.action.OFAction;
-import org.openflow.protocol.action.OFActionDataLayerDestination;
-import org.openflow.protocol.action.OFActionDataLayerSource;
-import org.openflow.protocol.action.OFActionEnqueue;
-import org.openflow.protocol.action.OFActionNetworkLayerDestination;
-import org.openflow.protocol.action.OFActionNetworkLayerSource;
-import org.openflow.protocol.action.OFActionNetworkTypeOfService;
-import org.openflow.protocol.action.OFActionOutput;
-import org.openflow.protocol.action.OFActionStripVirtualLan;
-import org.openflow.protocol.action.OFActionTransportLayerDestination;
-import org.openflow.protocol.action.OFActionTransportLayerSource;
-import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
-import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
 import org.openflow.protocol.statistics.OFFlowStatisticsReply;
 import org.openflow.protocol.statistics.OFFlowStatisticsRequest;
 import org.openflow.protocol.statistics.OFStatistics;
@@ -37,36 +21,16 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.tinkerpop.blueprints.Direction;
-
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.IOFSwitch;
 import net.floodlightcontroller.core.IOFSwitchListener;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
-import net.floodlightcontroller.core.module.IFloodlightModule;
-import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.restserver.IRestApiService;
-import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.graph.GraphDBOperation;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
-import net.onrc.onos.ofcontroller.core.module.IOnosService;
-import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
 import net.onrc.onos.ofcontroller.util.Dpid;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction;
-import net.onrc.onos.ofcontroller.util.FlowEntryActions;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionEnqueue;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionOutput;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetEthernetAddr;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIPv4Addr;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetIpToS;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetTcpUdpPort;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanId;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionSetVlanPriority;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.ActionStripVlan;
 import net.onrc.onos.registry.controller.IControllerRegistryService;
 
 public class FlowSynchronizer implements IFlowSyncService, IOFSwitchListener {
@@ -86,8 +50,8 @@
     public void synchronize(IOFSwitch sw) {
 	Synchroizer sync = new Synchroizer(sw);
 	Thread t = new Thread(sync);
-	t.start();
 	switchThread.put(sw, t);
+	t.start();
     }
 
     @Override
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index e16dd20..94d6e35 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -1,43 +1,75 @@
 package net.onrc.onos.ofcontroller.flowprogrammer;
 
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+
 import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 
-import org.openflow.protocol.OFMessage;
-
 public interface IFlowPusherService extends IFloodlightService {
 	/**
-	 * Add a message to the queue of a switch.
-	 * @param sw
-	 * @param msg
-	 * @return
+	 * Add a message to the queue of the switch.
+	 * @param sw Switch to which message is pushed.
+	 * @param msg Message object to be added.
+	 * @return true if message is successfully added to a queue.
 	 */
 	boolean add(IOFSwitch sw, OFMessage msg);
+
+	/**
+	 * Create a message from FlowEntry and add it to the queue of the switch.
+	 * @param sw Switch to which message is pushed.
+	 * @param flowPath FlowPath object used for creating message.
+	 * @param flowEntry FlowEntry object used for creating message.
+	 * @return true if message is successfully added to a queue.
+	 */
 	boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry);
+
+	/**
+	 * Create a message from IFlowEntry and add it to the queue of the switch.
+	 * @param sw Switch to which message is pushed.
+	 * @param flowObj IFlowPath object used for creating message.
+	 * @param flowEntryObj IFlowEntry object used for creating message.
+	 * @return true if message is successfully added to a queue.
+	 */
 	boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
+
+	/**
+	 * Add BARRIER message to queue and wait for reply.
+	 * @param sw Switch to which barrier message is pushed.
+	 * @return BARRIER_REPLY message sent from switch.
+	 */
+	OFBarrierReply barrier(IOFSwitch sw);
+	
+	/**
+	 * Add BARRIER message to queue asynchronously.
+	 * @param sw Switch to which barrier message is pushed.
+	 * @return Future object of BARRIER_REPLY message which will be sent from switch.
+	 */
+	OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
 	
 	/**
 	 * Suspend pushing message to a switch.
-	 * @param sw
+	 * @param sw Switch to be suspended pushing message.
 	 * @return true if success
 	 */
 	boolean suspend(IOFSwitch sw);
 	
 	/**
 	 * Resume pushing message to a switch.
-	 * @param sw
+	 * @param sw Switch to be resumed pushing message.
 	 * @return true if success
 	 */
 	boolean resume(IOFSwitch sw);
 	
 	/**
 	 * Get whether pushing of message is suspended or not.
-	 * @param sw
-	 * @return true if suspended
+	 * @param sw Switch to be checked.
+	 * @return true if suspended.
 	 */
 	boolean isSuspended(IOFSwitch sw);
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
new file mode 100644
index 0000000..3013f5a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/OFBarrierReplyFuture.java
@@ -0,0 +1,49 @@
+package net.onrc.onos.ofcontroller.flowprogrammer;
+
+import java.util.concurrent.TimeUnit;
+
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFType;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+
+public class OFBarrierReplyFuture extends OFMessageFuture<OFBarrierReply> {
+
+    protected volatile boolean finished;
+
+    public OFBarrierReplyFuture(IThreadPoolService tp,
+            IOFSwitch sw, int transactionId) {
+        super(tp, sw, OFType.FEATURES_REPLY, transactionId);
+        init();
+    }
+
+    public OFBarrierReplyFuture(IThreadPoolService tp,
+            IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
+        super(tp, sw, OFType.FEATURES_REPLY, transactionId, timeout, unit);
+        init();
+    }
+
+    private void init() {
+        this.finished = false;
+        this.result = null;
+    }
+
+    @Override
+    protected void handleReply(IOFSwitch sw, OFMessage msg) {
+        this.result = (OFBarrierReply) msg;
+        this.finished = true;
+    }
+
+    @Override
+    protected boolean isFinished() {
+        return finished;
+    }
+
+    @Override
+    protected void unRegister() {
+        super.unRegister();
+    }
+}