Merge pull request #459 from n-shiota/syncdev17

I added JavaDoc comments to FlowProgrammer related modules, and added small modification below:

Changed unit for representing rate from bytes/sec to bytes/ms (for computation convenience)
Added explicit creation/deletion API to FlowPusher (implicit creation as before is also available)
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
index 45f59b3..33c9a6a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowProgrammer.java
@@ -22,21 +22,34 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.registry.controller.IControllerRegistryService;
 
+/**
+ * FlowProgrammer is a module responsible to maintain flows installed to switches.
+ * FlowProgrammer consists of FlowPusher and FlowSynchronizer.
+ * FlowPusher manages the rate of installation, and FlowSynchronizer synchronizes
+ * flows between GraphDB and switches.
+ * FlowProgrammer also watch the event of addition/deletion of switches to
+ * start/stop synchronization. When a switch is added to network, FlowProgrammer
+ * immediately kicks synchronization to keep switch's flow table latest state.
+ * Adversely, when a switch is removed from network, FlowProgrammer immediately
+ * stops synchronization.
+ * @author Brian
+ *
+ */
 public class FlowProgrammer implements IFloodlightModule, 
 				       IOFMessageListener,
 				       IOFSwitchListener {
     @SuppressWarnings("unused")
+    // flag to enable FlowSynchronizer
     private static final boolean enableFlowSync = false;
     protected static Logger log = LoggerFactory.getLogger(FlowProgrammer.class);
     protected volatile IFloodlightProviderService floodlightProvider;
     protected volatile IControllerRegistryService registryService;
 
-
     protected FlowPusher pusher;
     private static final int NUM_PUSHER_THREAD = 1;
 
     protected FlowSynchronizer synchronizer;
-        
+    
     public FlowProgrammer() {
 	pusher = new FlowPusher(NUM_PUSHER_THREAD);
 	if (enableFlowSync) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
index f43a83e..6f85c5d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowPusher.java
@@ -40,16 +40,20 @@
 import net.onrc.onos.ofcontroller.util.Port;
 
 /**
- * FlowPusher intermediates FlowManager/FlowSynchronizer and switches to push OpenFlow
- * messages to switches in proper rate.
+ * FlowPusher is a implementation of FlowPusherService.
+ * FlowPusher assigns one message queue instance for each one switch.
+ * Number of message processing threads is configurable by constructor, and
+ * one thread can handle multiple message queues. Each queue will be assigned to 
+ * a thread according to hash function defined by getHash().
+ * Each processing thread reads messages from queues and sends it to switches
+ * in round-robin. Processing thread also calculates rate of sending to suppress
+ * excessive message sending.
  * @author Naoki Shiota
  *
  */
 public class FlowPusher implements IFlowPusherService, IOFMessageListener {
     private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
 
-    private static boolean barrierIfEmpty = false;
-    
     // NOTE: Below are moved from FlowManager.
     // TODO: Values copied from elsewhere (class LearningSwitch).
     // The local copy should go away!
@@ -57,10 +61,6 @@
     protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
     protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250;	// ms
     
-    // Interval of sleep when queue is empty
-    protected static final long SLEEP_MILLI_SEC = 10;
-    protected static final int SLEEP_NANO_SEC = 0;
-    
     // Number of messages sent to switch at once
     protected static final int MAX_MESSAGE_SEND = 100;
 
@@ -74,7 +74,7 @@
 	}
 	
 	/**
-	 * Message queue attached to a switch.
+	 * SwitchQueue represents message queue attached to a switch.
 	 * This consists of queue itself and variables used for limiting sending rate.
 	 * @author Naoki Shiota
 	 *
@@ -83,11 +83,14 @@
 	private class SwitchQueue extends ArrayDeque<OFMessage> {
 		QueueState state;
 		
-		// Max rate of sending message (bytes/sec). 0 implies no limitation.
+		// Max rate of sending message (bytes/ms). 0 implies no limitation.
 		long max_rate = 0;	// 0 indicates no limitation
 		long last_sent_time = 0;
 		long last_sent_size = 0;
 		
+		// "To be deleted" flag
+		boolean toBeDeleted = false;
+		
 		/**
 		 * Check if sending rate is within the rate
 		 * @param current Current time
@@ -125,7 +128,10 @@
 
 	private FloodlightContext context = null;
 	private BasicFactory factory = null;
+	
+	// Map of threads versus dpid
 	private Map<Long, FlowPusherThread> threadMap = null;
+	// Map of Future objects versus dpid and transaction ID.
 	private Map<Long, Map<Integer, OFBarrierReplyFuture>>
 		barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
 	
@@ -138,14 +144,12 @@
 	 */
 	private class FlowPusherThread extends Thread {
 		private Map<IOFSwitch,SwitchQueue> queues
-		= new HashMap<IOFSwitch,SwitchQueue>();
+			= new HashMap<IOFSwitch,SwitchQueue>();
 		
 		private Semaphore mutex = new Semaphore(0);
 		
 		@Override
 		public void run() {
-			log.debug("Begin Flow Pusher Process");
-			
 			while (true) {
 				try {
 					// wait for message pushed to queue
@@ -172,7 +176,7 @@
 					}
 					
 					// check sending rate and determine it to be sent or not
-					long current_time = System.nanoTime();
+					long current_time = System.currentTimeMillis();
 					long size = 0;
 					
 					synchronized (queue) {
@@ -200,8 +204,13 @@
 							sw.flush();
 							queue.logSentData(current_time, size);
 							
-							if (queue.isEmpty() && barrierIfEmpty) {
-								barrier(sw);
+							if (queue.isEmpty()) {
+								// remove queue if flagged to be.
+								if (queue.toBeDeleted) {
+									synchronized (queues) {
+										queues.remove(sw);
+									}
+								}
 							}
 						}
 					}
@@ -247,7 +256,7 @@
 		if (damper != null) {
 			messageDamper = damper;
 		} else {
-			// use default value
+			// use default values
 			messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
 				    EnumSet.of(OFType.FLOW_MOD),
 				    OFMESSAGE_DAMPER_TIMEOUT);
@@ -272,10 +281,6 @@
 		}
 	}
 	
-	/**
-	 * Suspend sending messages to switch.
-	 * @param sw
-	 */
 	@Override
 	public boolean suspend(IOFSwitch sw) {
 		SwitchQueue queue = getQueue(sw);
@@ -293,9 +298,6 @@
 		}
 	}
 
-	/**
-	 * Resume sending messages to switch.
-	 */
 	@Override
 	public boolean resume(IOFSwitch sw) {
 		SwitchQueue queue = getQueue(sw);
@@ -313,9 +315,6 @@
 		}
 	}
 	
-	/**
-	 * Check if given switch is suspended.
-	 */
 	@Override
 	public boolean isSuspended(IOFSwitch sw) {
 		SwitchQueue queue = getQueue(sw);
@@ -341,11 +340,7 @@
 		}
 	}
 	
-	/**
-	 * Set sending rate to a switch.
-	 * @param sw Switch.
-	 * @param rate Rate in bytes/sec.
-	 */
+	@Override
 	public void setRate(IOFSwitch sw, long rate) {
 		SwitchQueue queue = getQueue(sw);
 		if (queue == null) {
@@ -356,24 +351,62 @@
 			queue.max_rate = rate;
 		}
 	}
+
+	@Override
+	public boolean createQueue(IOFSwitch sw) {
+		SwitchQueue queue = getQueue(sw);
+		if (queue != null) {
+			return false;
+		}
+		
+		FlowPusherThread proc = getProcess(sw);
+		queue = new SwitchQueue();
+		queue.state = QueueState.READY;
+		synchronized (proc) {
+			proc.queues.put(sw, queue);
+		}
+		
+		return true;
+	}
+
+	@Override
+	public boolean deleteQueue(IOFSwitch sw) {
+		return deleteQueue(sw, false);
+	}
 	
-	/**
-	 * Add OFMessage to queue of the switch.
-	 * @param sw Switch to which message is sent.
-	 * @param msg Message to be sent.
-	 * @return true if succeed.
-	 */
+	@Override
+	public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
+		FlowPusherThread proc = getProcess(sw);
+		
+		if (forceStop) {
+			synchronized (proc.queues) {
+				SwitchQueue queue = proc.queues.remove(sw);
+				if (queue == null) {
+					return false;
+				}
+			}
+			return true;
+		} else {
+			SwitchQueue queue = getQueue(sw);
+			if (queue == null) {
+				return false;
+			}
+			synchronized (queue) {
+				queue.toBeDeleted = true;
+			}
+			return true;
+		}
+	}
+	
 	@Override
 	public boolean add(IOFSwitch sw, OFMessage msg) {
 		FlowPusherThread proc = getProcess(sw);
 		SwitchQueue queue = proc.queues.get(sw);
 
+		// create queue at first addition of message
 		if (queue == null) {
-			queue = new SwitchQueue();
-			queue.state = QueueState.READY;
-			synchronized (proc) {
-				proc.queues.put(sw, queue);
-			}
+			createQueue(sw);
+			queue = getQueue(sw);
 		}
 		
 		synchronized (queue) {
@@ -388,13 +421,6 @@
 		return true;
 	}
 	
-	/**
-	 * Create OFMessage from given flow information and add it to the queue.
-	 * @param sw Switch to which message is sent.
-	 * @param flowObj FlowPath.
-	 * @param flowEntryObj FlowEntry.
-	 * @return true if succeed.
-	 */
 	@Override
 	public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
 		log.debug("sending : {}, {}", sw, flowObj);
@@ -700,13 +726,6 @@
 		return true;
 	}
 	
-	/**
-	 * Create OFMessage from given flow information and add it to the queue.
-	 * @param sw Switch to which message is sent.
-	 * @param flowPath FlowPath.
-	 * @param flowEntry FlowEntry.
-	 * @return true if secceed.
-	 */
 	@Override
 	public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
 		//
@@ -1061,12 +1080,16 @@
 				barrierFutures.put(sw.getId(), map);
 			}
 			map.put(msg.getXid(), future);
-			log.debug("Inserted future for {}", msg.getXid());
 		}
 		
 		return future;
 	}
 
+	/**
+	 * Get a queue attached to a switch.
+	 * @param sw Switch object
+	 * @return Queue object
+	 */
 	protected SwitchQueue getQueue(IOFSwitch sw) {
 		if (sw == null)  {
 			return null;
@@ -1075,12 +1098,22 @@
 		return getProcess(sw).queues.get(sw);
 	}
 	
+	/**
+	 * Get a hash value correspondent to a switch.
+	 * @param sw Switch object
+	 * @return Hash value
+	 */
 	protected long getHash(IOFSwitch sw) {
 		// This code assumes DPID is sequentially assigned.
 		// TODO consider equalization algorithm
 		return sw.getId() % number_thread;
 	}
-	
+
+	/**
+	 * Get a Thread object which processes the queue attached to a switch.
+	 * @param sw Switch object
+	 * @return Thread object
+	 */
 	protected FlowPusherThread getProcess(IOFSwitch sw) {
 		long hash = getHash(sw);
 		
@@ -1119,4 +1152,5 @@
 		
 		return Command.CONTINUE;
 	}
+
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
index 7bda71f..0d6b0e8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/FlowSynchronizer.java
@@ -28,6 +28,14 @@
 import net.onrc.onos.ofcontroller.util.Dpid;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
 
+/**
+ * FlowSynchronizer is an implementation of FlowSyncService.
+ * In addition to IFlowSyncService, FlowSynchronizer periodically reads flow tables from
+ * switches and compare them with GraphDB to drop unnecessary flows and/or to install
+ * missing flows.
+ * @author Brian
+ *
+ */
 public class FlowSynchronizer implements IFlowSyncService {
 
     private static Logger log = LoggerFactory.getLogger(FlowSynchronizer.class);
@@ -57,11 +65,20 @@
 	}	
     }
 
+    /**
+     * Initialize Synchronizer.
+     * @param pusherService FlowPusherService used for sending messages.
+     */
     public void init(IFlowPusherService pusherService) {
 	pusher = pusherService;
     }
 
-    protected class Synchronizer implements Runnable {
+    /**
+     * Synchronizer represents main thread of synchronization.
+     * @author Brian
+     *
+     */
+	protected class Synchronizer implements Runnable {
 	IOFSwitch sw;
 	ISwitchObject swObj;
 
@@ -81,6 +98,12 @@
 	    //pusher.resume(sw);
 	}
 
+	/**
+	 * Compare flows entries in GraphDB and switch to pick up necessary messages.
+	 * After picking up, picked messages are added to FlowPusher.
+	 * @param graphEntries Flow entries in GraphDB.
+	 * @param switchEntries Flow entries in switch.
+	 */
 	private void compare(Set<FlowEntryWrapper> graphEntries, Set<FlowEntryWrapper> switchEntries) {
 	    int added = 0, removed = 0, skipped = 0;
 	    for(FlowEntryWrapper entry : switchEntries) {
@@ -104,6 +127,10 @@
 		      "Flow entries skipped " + skipped);
 	}
 
+	/**
+	 * Read GraphDB to get FlowEntries associated with a switch.
+	 * @return set of FlowEntries
+	 */
 	private Set<FlowEntryWrapper> getFlowEntriesFromGraph() {
 	    Set<FlowEntryWrapper> entries = new HashSet<FlowEntryWrapper>();
 	    for(IFlowEntry entry : swObj.getFlowEntries()) {
@@ -113,6 +140,10 @@
 	    return entries;	    
 	}
 
+	/**
+	 * Read flow table from switch and derive FlowEntries from table.
+	 * @return set of FlowEntries
+	 */
 	private Set<FlowEntryWrapper> getFlowEntriesFromSwitch() {
 
 	    int lengthU = 0;
@@ -159,6 +190,12 @@
 
     }
 
+    /**
+     * FlowEntryWrapper represents abstract FlowEntry which is embodied by IFlowEntry
+     * (from GraphDB) or OFFlowStatisticsReply (from switch).
+     * @author Brian
+     *
+     */
     class FlowEntryWrapper {
 	FlowEntryId id;
 	IFlowEntry iflowEntry;
@@ -174,6 +211,10 @@
 	    id = new FlowEntryId(entry.getCookie());
 	}
 
+	/**
+	 * Install this FlowEntry to a switch via FlowPusher.
+	 * @param sw Switch to which flow will be installed.
+	 */
 	public void addToSwitch(IOFSwitch sw) {
 	    if(iflowEntry != null) {
 		pusher.add(sw, iflowEntry.getFlow(), iflowEntry);
@@ -184,6 +225,10 @@
 	    }
 	}
 	
+	/**
+	 * Remove this FlowEntry from a switch via FlowPusher.
+	 * @param sw Switch from which flow will be removed.
+	 */
 	public void removeFromSwitch(IOFSwitch sw){
 	    if(iflowEntry != null) {
 		log.error("Removing non-existent flow entry {} from sw {}", 
@@ -216,6 +261,7 @@
 	 * the same value; otherwise, returns false.
 	 * 
 	 * @param Object to compare
+	 * @return true if the object has the same Flow Entry ID.
 	 */
 	@Override
 	public boolean equals(Object obj){
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
index 94d6e35..20a6249 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowPusherService.java
@@ -11,8 +11,44 @@
 import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowPath;
 
+/**
+ * FlowPusherService is a service to send message to switches in proper rate.
+ * Conceptually a queue is attached to each switch, and FlowPusherService
+ * read a message from queue and send it to switch in order.
+ * To guarantee message has been installed, FlowPusherService can add barrier
+ * message to queue and can notify when barrier message is sent to switch.
+ * @author Naoki Shiota
+ *
+ */
 public interface IFlowPusherService extends IFloodlightService {
 	/**
+	 * Create a queue correspondent to the switch.
+	 * @param sw Switch to which new queue is attached.
+	 * @return true if new queue is successfully created.
+	 */
+	boolean createQueue(IOFSwitch sw);
+
+	/**
+	 * Delete a queue correspondent to the switch.
+	 * Messages remains in queue will be all sent before queue is deleted.
+	 * @param sw Switch of which queue is deleted.
+	 * @return true if queue is successfully deleted.
+	 */
+	boolean deleteQueue(IOFSwitch sw);
+	
+	/**
+	 * Delete a queue correspondent to the switch.
+	 * By setting force flag on, queue will be deleted immediately.
+	 * @param sw Switch of which queue is deleted.
+	 * @param forceStop If this flag is set to true, queue will be deleted
+	 *        immediately regardless of any messages in the queue.
+	 *        If false, all messages will be sent to switch and queue will
+	 *        be deleted after that.
+	 * @return true if queue is successfully deleted or flagged to be deleted.
+	 */
+	boolean deleteQueue(IOFSwitch sw, boolean forceStop);
+	
+	/**
 	 * Add a message to the queue of the switch.
 	 * @param sw Switch to which message is pushed.
 	 * @param msg Message object to be added.
@@ -39,6 +75,13 @@
 	boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj);
 
 	/**
+	 * Set sending rate to a switch.
+	 * @param sw Switch.
+	 * @param rate Rate in bytes/ms.
+	 */
+	public void setRate(IOFSwitch sw, long rate);
+	
+	/**
 	 * Add BARRIER message to queue and wait for reply.
 	 * @param sw Switch to which barrier message is pushed.
 	 * @return BARRIER_REPLY message sent from switch.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
index 2b9d51d..4e6efaf 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowprogrammer/IFlowSyncService.java
@@ -5,7 +5,9 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 
 /**
- * @author bocon
+ * FlowSyncService is a service to synchronize GraphDB and switch's flow table.
+ * FlowSyncService offers APIs to trigger and interrupt synchronization explicitly.
+ * @author Brian
  *
  */
 public interface IFlowSyncService extends IFloodlightService {