Refactored FlowPusher
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index 61d8277..6c7a2bb 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -1,15 +1,28 @@
 package net.onrc.onos.ofcontroller.flowmanager;
 
 import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 
-import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.*;
+import org.openflow.protocol.action.*;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import net.floodlightcontroller.core.FloodlightContext;
 import net.floodlightcontroller.core.IOFSwitch;
-import net.onrc.onos.ofcontroller.flowmanager.FlowQueueTable.QueueState;
+import net.floodlightcontroller.util.OFMessageDamper;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
+import net.onrc.onos.ofcontroller.util.FlowEntryActions;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
 
 /**
  * FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
@@ -18,104 +31,508 @@
  *
  */
 public class FlowPusher {
-	private FloodlightContext context;
-	private FlowQueueTable flowQueueTable = null;
-	private Thread thread;
+    private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+
+    // NOTE: Below are moved from FlowManager.
+    // TODO: Values copied from elsewhere (class LearningSwitch).
+    // The local copy should go away!
+    //
+    protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
+    protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250;	// ms
+
+    public static final short PRIORITY_DEFAULT = 100;
+    public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0;	// infinity
+    public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0;	// infinite
+
+	public enum QueueState {
+		READY,
+		SUSPENDED,
+	}
 	
-	/**
-	 * Represents state of queue.
-	 * This is used for calculation of rate.
-	 * @author Naoki Shiota
-	 *
-	 */
-	private static class RateInfo {
+	private class SwitchQueue extends ArrayDeque<OFMessage> {
+		QueueState state;
+		
+		// Max rate of sending message (bytes/sec). 0 implies no limitation.
+		long max_rate = 0;
 		long last_sent_time = 0;
 		long last_sent_size = 0;
+		
+		/**
+		 * Check if sending rate is within the rate
+		 * @param current Current time
+		 * @return true if within the rate
+		 */
+		boolean isSendable(long current) {
+			long rate = last_sent_size / (current - last_sent_time);
+			
+			if (rate < max_rate) {
+				return true;
+			} else {
+				return false;
+			}
+		}
+		
+		void updateRate(long current, OFMessage msg) {
+			last_sent_time = current;
+			last_sent_size = msg.getLengthU();
+		}
+		
 	}
+	
+	private Map<IOFSwitch,SwitchQueue> queues
+		= new HashMap<IOFSwitch,SwitchQueue>();
+	
+	private OFMessageDamper messageDamper;
 
-	private Map<Long, RateInfo> queue_rateinfos =
-			new HashMap<Long, RateInfo>();
-
+	private FloodlightContext context = null;
+	private BasicFactory factory = null;
+	private Thread thread = null;
+	
+	
 	private class FlowPusherProcess implements Runnable {
 		@Override
 		public void run() {
-			if (flowQueueTable == null) {
-				return;
-			}
 			
 			while (true) {
-				for (IOFSwitch sw : flowQueueTable.getSwitches()) {
+				for (Map.Entry<IOFSwitch,SwitchQueue> entry : queues.entrySet()) {
+					IOFSwitch sw = entry.getKey();
+					SwitchQueue queue = entry.getValue();
+
 					// Skip if queue is suspended
-					if (flowQueueTable.getQueueState(sw) != QueueState.READY) {
+					if (sw == null || queue == null ||
+							queue.state != QueueState.READY) {
 						continue;
 					}
 					
-					// Skip if queue is locked
-					if (! flowQueueTable.lockQueueIfAvailable(sw)) {
-						continue;
-					}
-					
-					long dpid = sw.getId();
-					Queue<OFMessage> queue = flowQueueTable.getQueue(sw);
-					
-					if (queue == null) {
-						flowQueueTable.unlockQueue(sw);
-						continue;
-					}
-					
-					OFMessage msg = queue.poll();
-					if (msg == null) {
-						flowQueueTable.unlockQueue(sw);
-						continue;
-					}
-					
-					RateInfo state = queue_rateinfos.get(dpid);
-					if (state == null) {
-						queue_rateinfos.put(dpid, new RateInfo());
-					}
-					
-					// check sending rate and determine it to be sent or not
-					long current_time = System.nanoTime();
-					long rate = state.last_sent_size / (current_time - state.last_sent_time);
-					
-					// if need to send, call IOFSwitch#write()
-					if (rate < flowQueueTable.getQueueRate(sw)) {
-						try {
-							sw.write(msg, context);
-							state.last_sent_time = current_time;
-							state.last_sent_size = msg.getLengthU();
-						} catch (IOException e) {
-							// TODO Auto-generated catch block
-							e.printStackTrace();
+					synchronized (queue) {
+						// TODO send multiple messages at once
+						
+						while (! queue.isEmpty()) {
+							OFMessage msg = queue.poll();
+							// check sending rate and determine it to be sent or not
+							long current_time = System.nanoTime();
+							
+							// if need to send, call IOFSwitch#write()
+							if (queue.isSendable(current_time)) {
+								try {
+									messageDamper.write(sw, msg, context);
+									queue.updateRate(current_time, msg);
+								} catch (IOException e) {
+									// TODO Auto-generated catch block
+									e.printStackTrace();
+								}
+							}
 						}
 					}
-					
-					flowQueueTable.unlockQueue(sw);
 				}
 				
 				// sleep while all queues are empty
 				boolean sleep = true;
 				do {
+					
 					// TODO check if queues are empty
 				} while (sleep);
 			}
 		}
 	}
 	
-	public FlowPusher(FlowQueueTable table, FloodlightContext context) {
-		flowQueueTable = table;
+	public void init(FloodlightContext context, BasicFactory factory) {
 		this.context = context;
+		this.factory = factory;
+		messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+			    EnumSet.of(OFType.FLOW_MOD),
+			    OFMESSAGE_DAMPER_TIMEOUT);
 	}
-	
-	public void startProcess() {
+	/**
+	 * Begin processing queue.
+	 */
+	public void start() {
+		if (context == null || factory == null) {
+			// not yet initialized
+			return;
+		}
+		
 		thread = new Thread(new FlowPusherProcess());
 		thread.start();
 	}
 	
-	public void stopProcess() {
+	/**
+	 * Suspend processing a queue related to given switch.
+	 * @param sw
+	 */
+	public void suspend(IOFSwitch sw) {
+		SwitchQueue queue = getQueue(sw);
+		
+		if (queue == null) {
+			return;
+		}
+		
+		synchronized (queue) {
+			if (queue.state == QueueState.READY) {
+				queue.state = QueueState.SUSPENDED;
+			}
+		}
+	}
+
+	/**
+	 * Resume processing a queue related to given switch.
+	 */
+	public void resume(IOFSwitch sw) {
+		SwitchQueue queue = getQueue(sw);
+		
+		if (queue == null) {
+			return;
+		}
+		
+		synchronized (queue) {
+			if (queue.state == QueueState.SUSPENDED) {
+				queue.state = QueueState.READY;
+			}
+		}
+	}
+
+	/**
+	 * End processing queue and exit thread.
+	 */
+	public void stop() {
 		if (thread != null && thread.isAlive()) {
 			// TODO tell thread to halt
 		}
 	}
 	
+	/**
+	 * Add OFMessage to the queue related to given switch.
+	 * @param sw
+	 * @param msg
+	 */
+	public boolean send(IOFSwitch sw, OFMessage msg) {
+		SwitchQueue queue = getQueue(sw);
+		if (queue == null) {
+			queues.put(sw, new SwitchQueue());
+		}
+		
+		synchronized (queue) {
+			queue.add(msg);
+		}
+		
+		return true;
+	}
+	
+	/**
+	 * Create OFMessage from given flow information and add it to the queue.
+	 * @param sw
+	 * @param flowObj
+	 * @param flowEntryObj
+	 * @return
+	 */
+	public boolean send(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
+		String flowEntryIdStr = flowEntryObj.getFlowEntryId();
+		if (flowEntryIdStr == null)
+		    return false;
+		FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
+		String userState = flowEntryObj.getUserState();
+		if (userState == null)
+		    return false;
+
+		//
+		// Create the Open Flow Flow Modification Entry to push
+		//
+		OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
+		long cookie = flowEntryId.value();
+
+		short flowModCommand = OFFlowMod.OFPFC_ADD;
+		if (userState.equals("FE_USER_ADD")) {
+		    flowModCommand = OFFlowMod.OFPFC_ADD;
+		} else if (userState.equals("FE_USER_MODIFY")) {
+		    flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+		} else if (userState.equals("FE_USER_DELETE")) {
+		    flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+		} else {
+		    // Unknown user state. Ignore the entry
+		    log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+			      flowEntryId.toString(), userState);
+		    return false;
+		}
+
+		//
+		// Fetch the match conditions.
+		//
+		// NOTE: The Flow matching conditions common for all Flow Entries are
+		// used ONLY if a Flow Entry does NOT have the corresponding matching
+		// condition set.
+		//
+		OFMatch match = new OFMatch();
+		match.setWildcards(OFMatch.OFPFW_ALL);
+
+		// Match the Incoming Port
+		Short matchInPort = flowEntryObj.getMatchInPort();
+		if (matchInPort != null) {
+		    match.setInputPort(matchInPort);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+		}
+
+		// Match the Source MAC address
+		String matchSrcMac = flowEntryObj.getMatchSrcMac();
+		if (matchSrcMac == null)
+		    matchSrcMac = flowObj.getMatchSrcMac();
+		if (matchSrcMac != null) {
+		    match.setDataLayerSource(matchSrcMac);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+		}
+
+		// Match the Destination MAC address
+		String matchDstMac = flowEntryObj.getMatchDstMac();
+		if (matchDstMac == null)
+		    matchDstMac = flowObj.getMatchDstMac();
+		if (matchDstMac != null) {
+		    match.setDataLayerDestination(matchDstMac);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+		}
+
+		// Match the Ethernet Frame Type
+		Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
+		if (matchEthernetFrameType == null)
+		    matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
+		if (matchEthernetFrameType != null) {
+		    match.setDataLayerType(matchEthernetFrameType);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+		}
+
+		// Match the VLAN ID
+		Short matchVlanId = flowEntryObj.getMatchVlanId();
+		if (matchVlanId == null)
+		    matchVlanId = flowObj.getMatchVlanId();
+		if (matchVlanId != null) {
+		    match.setDataLayerVirtualLan(matchVlanId);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+		}
+
+		// Match the VLAN priority
+		Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
+		if (matchVlanPriority == null)
+		    matchVlanPriority = flowObj.getMatchVlanPriority();
+		if (matchVlanPriority != null) {
+		    match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
+		}
+
+		// Match the Source IPv4 Network prefix
+		String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
+		if (matchSrcIPv4Net == null)
+		    matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
+		if (matchSrcIPv4Net != null) {
+		    match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
+		}
+
+		// Match the Destination IPv4 Network prefix
+		String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
+		if (matchDstIPv4Net == null)
+		    matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
+		if (matchDstIPv4Net != null) {
+		    match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
+		}
+
+		// Match the IP protocol
+		Byte matchIpProto = flowEntryObj.getMatchIpProto();
+		if (matchIpProto == null)
+		    matchIpProto = flowObj.getMatchIpProto();
+		if (matchIpProto != null) {
+		    match.setNetworkProtocol(matchIpProto);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+		}
+
+		// Match the IP ToS (DSCP field, 6 bits)
+		Byte matchIpToS = flowEntryObj.getMatchIpToS();
+		if (matchIpToS == null)
+		    matchIpToS = flowObj.getMatchIpToS();
+		if (matchIpToS != null) {
+		    match.setNetworkTypeOfService(matchIpToS);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+		}
+
+		// Match the Source TCP/UDP port
+		Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
+		if (matchSrcTcpUdpPort == null)
+		    matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
+		if (matchSrcTcpUdpPort != null) {
+		    match.setTransportSource(matchSrcTcpUdpPort);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+		}
+
+		// Match the Destination TCP/UDP port
+		Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
+		if (matchDstTcpUdpPort == null)
+		    matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
+		if (matchDstTcpUdpPort != null) {
+		    match.setTransportDestination(matchDstTcpUdpPort);
+		    match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+		}
+
+		//
+		// Fetch the actions
+		//
+		Short actionOutputPort = null;
+		List<OFAction> openFlowActions = new ArrayList<OFAction>();
+		int actionsLen = 0;
+		FlowEntryActions flowEntryActions = null;
+		String actionsStr = flowEntryObj.getActions();
+		if (actionsStr != null)
+		    flowEntryActions = new FlowEntryActions(actionsStr);
+		else
+		    flowEntryActions = new FlowEntryActions();
+		for (FlowEntryAction action : flowEntryActions.actions()) {
+		    ActionOutput actionOutput = action.actionOutput();
+		    ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+		    ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
+		    ActionStripVlan actionStripVlan = action.actionStripVlan();
+		    ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
+		    ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
+		    ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
+		    ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
+		    ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+		    ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
+		    ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
+		    ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+		    if (actionOutput != null) {
+				actionOutputPort = actionOutput.port().value();
+				// XXX: The max length is hard-coded for now
+				OFActionOutput ofa =
+				    new OFActionOutput(actionOutput.port().value(),
+						       (short)0xffff);
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionSetVlanId != null) {
+				OFActionVirtualLanIdentifier ofa =
+				    new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionSetVlanPriority != null) {
+				OFActionVirtualLanPriorityCodePoint ofa =
+				    new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionStripVlan != null) {
+				if (actionStripVlan.stripVlan() == true) {
+				    OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+				    openFlowActions.add(ofa);
+				    actionsLen += ofa.getLength();
+				}
+		    }
+
+		    if (actionSetEthernetSrcAddr != null) {
+				OFActionDataLayerSource ofa = 
+				    new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionSetEthernetDstAddr != null) {
+				OFActionDataLayerDestination ofa =
+				    new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionSetIPv4SrcAddr != null) {
+				OFActionNetworkLayerSource ofa =
+				    new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionSetIPv4DstAddr != null) {
+				OFActionNetworkLayerDestination ofa =
+				    new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionSetIpToS != null) {
+				OFActionNetworkTypeOfService ofa =
+				    new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionSetTcpUdpSrcPort != null) {
+				OFActionTransportLayerSource ofa =
+				    new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionSetTcpUdpDstPort != null) {
+				OFActionTransportLayerDestination ofa =
+				    new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+
+		    if (actionEnqueue != null) {
+				OFActionEnqueue ofa =
+				    new OFActionEnqueue(actionEnqueue.port().value(),
+							actionEnqueue.queueId());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+		    }
+		}
+
+		fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
+		    .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
+		    .setPriority(PRIORITY_DEFAULT)
+		    .setBufferId(OFPacketOut.BUFFER_ID_NONE)
+		    .setCookie(cookie)
+		    .setCommand(flowModCommand)
+		    .setMatch(match)
+		    .setActions(openFlowActions)
+		    .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+		fm.setOutPort(OFPort.OFPP_NONE.getValue());
+		if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
+		    (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+		    if (actionOutputPort != null)
+			fm.setOutPort(actionOutputPort);
+		}
+
+		//
+		// TODO: Set the following flag
+		// fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+		// See method ForwardingBase::pushRoute()
+		//
+
+		//
+		// Write the message to the switch
+		//
+		log.debug("MEASUREMENT: Installing flow entry " + userState +
+			  " into switch DPID: " +
+			  sw.getStringId() +
+			  " flowEntryId: " + flowEntryId.toString() +
+			  " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
+			  " inPort: " + matchInPort + " outPort: " + actionOutputPort
+			  );
+		send(sw,fm);
+	    //
+	    // TODO: We should use the OpenFlow Barrier mechanism
+	    // to check for errors, and update the SwitchState
+	    // for a flow entry after the Barrier message is
+	    // is received.
+	    //
+	    flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
+
+		return true;
+	}
+	
+	private SwitchQueue getQueue(IOFSwitch sw) {
+		if (sw == null)  {
+			return null;
+		}
+		
+		return queues.get(sw);
+	}
 }