Merge branch 'syncdev' into syncdev17
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index a59a9f9..ebc86e4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -4,14 +4,16 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+
 import net.floodlightcontroller.core.IFloodlightProviderService;
 import net.floodlightcontroller.core.module.FloodlightModuleContext;
 import net.floodlightcontroller.core.module.FloodlightModuleException;
 import net.floodlightcontroller.core.module.IFloodlightModule;
 import net.floodlightcontroller.core.module.IFloodlightService;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
 
 public class FlowProgrammer implements IFloodlightModule {
-	private static final boolean enableFlowSync = false;
+	private static final boolean enableFlowSync = true;
 	
     protected volatile IFloodlightProviderService floodlightProvider;
 
@@ -31,7 +33,7 @@
     public void init(FloodlightModuleContext context)
 	    throws FloodlightModuleException {
 	floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
-	pusher.init(null, floodlightProvider.getOFMessageFactory(), null);
+	pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
 	if (enableFlowSync) {
 	synchronizer.init(context);
 	}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index 532477a..6379f26 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -8,6 +8,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import org.openflow.protocol.*;
 import org.openflow.protocol.action.*;
@@ -16,7 +17,12 @@
 import org.slf4j.LoggerFactory;
 
 import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
 import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
 import net.floodlightcontroller.util.MACAddress;
 import net.floodlightcontroller.util.OFMessageDamper;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
@@ -38,7 +44,7 @@
  * @author Naoki Shiota
  *
  */
-public class FlowPusher implements IFlowPusherService {
+public class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
 
     // NOTE: Below are moved from FlowManager.
@@ -64,6 +70,12 @@
 		SUSPENDED,
 	}
 	
+	/**
+	 * Message queue attached to a switch.
+	 * This consists of queue itself and variables used for limiting sending rate.
+	 * @author Naoki Shiota
+	 *
+	 */
 	@SuppressWarnings("serial")
 	private class SwitchQueue extends ArrayDeque<OFMessage> {
 		QueueState state;
@@ -84,13 +96,9 @@
 				return true;
 			}
 			
+			// Check if sufficient time (from aspect of rate) elapsed or not.
 			long rate = last_sent_size / (current - last_sent_time);
-			
-			if (rate < max_rate) {
-				return true;
-			} else {
-				return false;
-			}
+			return (rate < max_rate);
 		}
 		
 		void logSentData(long current, long size) {
@@ -100,11 +108,14 @@
 		
 	}
 	
-	private OFMessageDamper messageDamper;
+	private OFMessageDamper messageDamper = null;
+	private IThreadPoolService threadPool = null;
 
 	private FloodlightContext context = null;
 	private BasicFactory factory = null;
 	private Map<Long, FlowPusherProcess> threadMap = null;
+	private Map<Long, Map<Integer, OFBarrierReplyFuture>>
+		barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
 	
 	private int number_thread = 1;
 	
@@ -197,17 +208,39 @@
 		}
 	}
 	
+	/**
+	 * Initialize object with one thread.
+	 */
 	public FlowPusher() {
-		
 	}
 	
+	/**
+	 * Initialize object with threads of given number.
+	 * @param number_thread Number of threads to handle messages.
+	 */
 	public FlowPusher(int number_thread) {
 		this.number_thread = number_thread;
 	}
 	
-	public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
+	/**
+	 * Set parameters needed for sending messages.
+	 * @param context FloodlightContext used for sending messages.
+	 *        If null, FlowPusher uses default context.
+	 * @param modContext FloodlightModuleContext used for acquiring
+	 *        ThreadPoolService and registering MessageListener.
+	 * @param factory Factory object to create OFMessage objects.
+	 * @param damper Message damper used for sending messages.
+	 *        If null, FlowPusher creates its own damper object.
+	 */
+	public void init(FloodlightContext context,
+			FloodlightModuleContext modContext,
+			BasicFactory factory,
+			OFMessageDamper damper) {
 		this.context = context;
 		this.factory = factory;
+		this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+		IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
+		flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
 		
 		if (damper != null) {
 			messageDamper = damper;
@@ -326,7 +359,7 @@
 	}
 	
 	/**
-	 * Add OFMessage to the queue related to given switch.
+	 * Add OFMessage to queue of the switch.
 	 * @param sw Switch to which message is sent.
 	 * @param msg Message to be sent.
 	 * @return true if succeed.
@@ -984,8 +1017,52 @@
 		
 		return add(sw,fm);
 	}
+	
+	@Override
+	public OFBarrierReply barrier(IOFSwitch sw) {
+		OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+		if (future == null) {
+			return null;
+		}
+		
+		try {
+			return future.get();
+		} catch (InterruptedException e) {
+			e.printStackTrace();
+			log.error("InterruptedException: {}", e);
+			return null;
+		} catch (ExecutionException e) {
+			e.printStackTrace();
+			log.error("ExecutionException: {}", e);
+			return null;
+		}
+	}
 
-	private SwitchQueue getQueue(IOFSwitch sw) {
+	@Override
+	public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+		// TODO creation of message and future should be moved to OFSwitchImpl
+		
+		OFMessage msg = factory.getMessage(OFType.BARRIER_REQUEST);
+		msg.setXid(sw.getNextTransactionId());
+		add(sw, msg);
+
+		// TODO create Future object of message
+		OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+		
+		synchronized (barrierFutures) {
+			Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+			if (map == null) {
+				map = new HashMap<Integer,OFBarrierReplyFuture>();
+				barrierFutures.put(sw.getId(), map);
+			}
+			map.put(msg.getXid(), future);
+			log.debug("Created future for {}", msg.getXid());
+		}
+		
+		return future;
+	}
+
+	protected SwitchQueue getQueue(IOFSwitch sw) {
 		if (sw == null)  {
 			return null;
 		}
@@ -993,14 +1070,53 @@
 		return getProcess(sw).queues.get(sw);
 	}
 	
-	private long getHash(IOFSwitch sw) {
-		// TODO should consider equalization algorithm
+	protected long getHash(IOFSwitch sw) {
+		// This code assumes DPID is sequentially assigned.
+		// TODO consider equalization algorithm
 		return sw.getId() % number_thread;
 	}
 	
-	private FlowPusherProcess getProcess(IOFSwitch sw) {
+	protected FlowPusherProcess getProcess(IOFSwitch sw) {
 		long hash = getHash(sw);
 		
 		return threadMap.get(hash);
 	}
+
+	@Override
+	public String getName() {
+		return "flowpusher";
+	}
+
+	@Override
+	public boolean isCallbackOrderingPrereq(OFType type, String name) {
+		return false;
+	}
+
+	@Override
+	public boolean isCallbackOrderingPostreq(OFType type, String name) {
+		return false;
+	}
+
+	@Override
+	public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+		// This check can be skipped (since Controller must filter).
+//		if (msg.getType() != OFType.BARRIER_REPLY) {
+//			return Command.CONTINUE;
+//		}
+		
+		Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
+		if (map == null) {
+			return Command.CONTINUE;
+		}
+		
+		OFBarrierReplyFuture future = map.get(msg.getXid());
+		if (future == null) {
+			return Command.CONTINUE;
+		}
+		
+		log.debug("Received BARRIER_REPLY : {}", msg);
+		future.deliverFuture(sw, msg);
+		
+		return Command.CONTINUE;
+	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index e16dd20..94d6e35 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -1,43 +1,75 @@
 package net.onrc.onos.ofcontroller.flowprogrammer;
 
+import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFMessage;
+
 import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 
-import org.openflow.protocol.OFMessage;
-
 public interface IFlowPusherService extends IFloodlightService {
 	/**
-	 * Add a message to the queue of a switch.
-	 * @param sw
-	 * @param msg
-	 * @return
+	 * Add a message to the queue of the switch.
+	 * @param sw Switch to which message is pushed.
+	 * @param msg Message object to be added.
+	 * @return true if message is successfully added to a queue.
 	 */
 	boolean add(IOFSwitch sw, OFMessage msg);
+
+	/**
+	 * Create a message from FlowEntry and add it to the queue of the switch.
+	 * @param sw Switch to which message is pushed.
+	 * @param flowPath FlowPath object used for creating message.
+	 * @param flowEntry FlowEntry object used for creating message.
+	 * @return true if message is successfully added to a queue.
+	 */
 	boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry);
+
+	/**
+	 * Create a message from IFlowEntry and add it to the queue of the switch.
+	 * @param sw Switch to which message is pushed.
+	 * @param flowObj IFlowPath object used for creating message.
+	 * @param flowEntryObj IFlowEntry object used for creating message.
+	 * @return true if message is successfully added to a queue.
+	 */
 	boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
+
+	/**
+	 * Add BARRIER message to queue and wait for reply.
+	 * @param sw Switch to which barrier message is pushed.
+	 * @return BARRIER_REPLY message sent from switch.
+	 */
+	OFBarrierReply barrier(IOFSwitch sw);
+	
+	/**
+	 * Add BARRIER message to queue asynchronously.
+	 * @param sw Switch to which barrier message is pushed.
+	 * @return Future object of BARRIER_REPLY message which will be sent from switch.
+	 */
+	OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw);
 	
 	/**
 	 * Suspend pushing message to a switch.
-	 * @param sw
+	 * @param sw Switch to be suspended pushing message.
 	 * @return true if success
 	 */
 	boolean suspend(IOFSwitch sw);
 	
 	/**
 	 * Resume pushing message to a switch.
-	 * @param sw
+	 * @param sw Switch to be resumed pushing message.
 	 * @return true if success
 	 */
 	boolean resume(IOFSwitch sw);
 	
 	/**
 	 * Get whether pushing of message is suspended or not.
-	 * @param sw
-	 * @return true if suspended
+	 * @param sw Switch to be checked.
+	 * @return true if suspended.
 	 */
 	boolean isSuspended(IOFSwitch sw);
 }