Added FlowPath/FlowEntry version interfaces to FlowPusher
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index a3f602f..0e70a9b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -785,7 +785,7 @@
      */
     private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
 				    IFlowEntry flowEntryObj) {
-    	return pusher.send(mySwitch, flowObj, flowEntryObj);
+    	return pusher.add(mySwitch, flowObj, flowEntryObj);
     }
 
     /**
@@ -798,14 +798,7 @@
      */
     private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
 				    FlowEntry flowEntry) {
-    	log.debug("Flow is sent to pusher : dpid({}) flow_id({})", mySwitch.getId(), flowEntry.getFlowId());
-    	// TODO  handle this installation by FlowPusher
-    	
-//	return FlowSwitchOperation.installFlowEntry(
-//		floodlightProvider.getOFMessageFactory(),
-//		messageDamper, mySwitch, flowPath, flowEntry);
-    	
-    	return true;
+    	return pusher.add(mySwitch, flowPath, flowEntry);
     }
 
     /**
@@ -920,7 +913,7 @@
 			return;
 		}
 		
-		pusher.send(sw, msg);
+		pusher.add(sw, msg);
 	}
 
 	@Override
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index 67d84e2..eef60a1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -17,13 +17,20 @@
 
 import net.floodlightcontroller.core.FloodlightContext;
 import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.util.MACAddress;
 import net.floodlightcontroller.util.OFMessageDamper;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
 import net.onrc.onos.ofcontroller.util.FlowEntryAction;
 import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryActions;
 import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
+import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.IPv4Net;
+import net.onrc.onos.ofcontroller.util.Port;
 
 /**
  * FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
@@ -81,9 +88,9 @@
 			}
 		}
 		
-		void updateRate(long current, OFMessage msg) {
+		void logSentData(long current, long size) {
 			last_sent_time = current;
-			last_sent_size = msg.getLengthU();
+			last_sent_size = size;
 		}
 		
 	}
@@ -127,33 +134,32 @@
 					
 					// check sending rate and determine it to be sent or not
 					long current_time = System.nanoTime();
+					long size = 0;
 					
 					synchronized (queue) {
 						if (queue.isSendable(current_time)) {
+							// TODO limit number of messages to be sent at once
 							while (! queue.isEmpty()) {
 								OFMessage msg = queue.poll();
 								
 								// if need to send, call IOFSwitch#write()
 								try {
 									messageDamper.write(sw, msg, context);
-									queue.updateRate(current_time, msg);
+									size += msg.getLength();
 									log.debug("Pusher sends message : {}", msg);
 								} catch (IOException e) {
 									// TODO Auto-generated catch block
 									e.printStackTrace();
 								}
 							}
+							queue.logSentData(current_time, size);
 						}
 					}
+					sw.flush();
 				}
 				
 				// sleep while all queues are empty
-				while (! isMsgAdded) {
-					if (isStopped) {
-						log.debug("Pusher Process finished.");
-						return;
-					}
-					
+				while (! (isMsgAdded || isStopped)) {
 					try {
 						Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
 					} catch (InterruptedException e) {
@@ -275,7 +281,7 @@
 	 * @param sw
 	 * @param msg
 	 */
-	public boolean send(IOFSwitch sw, OFMessage msg) {
+	public boolean add(IOFSwitch sw, OFMessage msg) {
 		SwitchQueue queue = getQueue(sw);
 		if (queue == null) {
 			queue = new SwitchQueue();
@@ -303,7 +309,7 @@
 	 * @param flowEntryObj
 	 * @return
 	 */
-	public boolean send(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
+	public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
 		log.debug("sending : {}, {}", sw, flowObj);
 		String flowEntryIdStr = flowEntryObj.getFlowEntryId();
 		if (flowEntryIdStr == null)
@@ -595,7 +601,7 @@
 			  " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
 			  " inPort: " + matchInPort + " outPort: " + actionOutputPort
 			  );
-		send(sw,fm);
+		add(sw,fm);
 	    //
 	    // TODO: We should use the OpenFlow Barrier mechanism
 	    // to check for errors, and update the SwitchState
@@ -607,6 +613,317 @@
 		return true;
 	}
 	
+	public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
+		//
+		// Create the OpenFlow Flow Modification Entry to push
+		//
+		OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+		long cookie = flowEntry.flowEntryId().value();
+
+		short flowModCommand = OFFlowMod.OFPFC_ADD;
+		if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
+			flowModCommand = OFFlowMod.OFPFC_ADD;
+		} else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
+			flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+		} else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
+			flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+		} else {
+			// Unknown user state. Ignore the entry
+			log.debug(
+					"Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+					flowEntry.flowEntryId().toString(),
+					flowEntry.flowEntryUserState());
+			return false;
+		}
+
+		//
+		// Fetch the match conditions.
+		//
+		// NOTE: The Flow matching conditions common for all Flow Entries are
+		// used ONLY if a Flow Entry does NOT have the corresponding matching
+		// condition set.
+		//
+		OFMatch match = new OFMatch();
+		match.setWildcards(OFMatch.OFPFW_ALL);
+		FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
+		FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+
+		// Match the Incoming Port
+		Port matchInPort = flowEntryMatch.inPort();
+		if (matchInPort != null) {
+			match.setInputPort(matchInPort.value());
+			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+		}
+
+		// Match the Source MAC address
+		MACAddress matchSrcMac = flowEntryMatch.srcMac();
+		if ((matchSrcMac == null) && (flowPathMatch != null)) {
+			matchSrcMac = flowPathMatch.srcMac();
+		}
+		if (matchSrcMac != null) {
+			match.setDataLayerSource(matchSrcMac.toString());
+			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+		}
+
+		// Match the Destination MAC address
+		MACAddress matchDstMac = flowEntryMatch.dstMac();
+		if ((matchDstMac == null) && (flowPathMatch != null)) {
+			matchDstMac = flowPathMatch.dstMac();
+		}
+		if (matchDstMac != null) {
+			match.setDataLayerDestination(matchDstMac.toString());
+			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+		}
+
+		// Match the Ethernet Frame Type
+		Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
+		if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
+			matchEthernetFrameType = flowPathMatch.ethernetFrameType();
+		}
+		if (matchEthernetFrameType != null) {
+			match.setDataLayerType(matchEthernetFrameType);
+			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+		}
+
+		// Match the VLAN ID
+		Short matchVlanId = flowEntryMatch.vlanId();
+		if ((matchVlanId == null) && (flowPathMatch != null)) {
+			matchVlanId = flowPathMatch.vlanId();
+		}
+		if (matchVlanId != null) {
+			match.setDataLayerVirtualLan(matchVlanId);
+			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+		}
+
+		// Match the VLAN priority
+		Byte matchVlanPriority = flowEntryMatch.vlanPriority();
+		if ((matchVlanPriority == null) && (flowPathMatch != null)) {
+			matchVlanPriority = flowPathMatch.vlanPriority();
+		}
+		if (matchVlanPriority != null) {
+			match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+			match.setWildcards(match.getWildcards()
+					& ~OFMatch.OFPFW_DL_VLAN_PCP);
+		}
+
+		// Match the Source IPv4 Network prefix
+		IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
+		if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
+			matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
+		}
+		if (matchSrcIPv4Net != null) {
+			match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
+		}
+
+		// Natch the Destination IPv4 Network prefix
+		IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
+		if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
+			matchDstIPv4Net = flowPathMatch.dstIPv4Net();
+		}
+		if (matchDstIPv4Net != null) {
+			match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
+		}
+
+		// Match the IP protocol
+		Byte matchIpProto = flowEntryMatch.ipProto();
+		if ((matchIpProto == null) && (flowPathMatch != null)) {
+			matchIpProto = flowPathMatch.ipProto();
+		}
+		if (matchIpProto != null) {
+			match.setNetworkProtocol(matchIpProto);
+			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+		}
+
+		// Match the IP ToS (DSCP field, 6 bits)
+		Byte matchIpToS = flowEntryMatch.ipToS();
+		if ((matchIpToS == null) && (flowPathMatch != null)) {
+			matchIpToS = flowPathMatch.ipToS();
+		}
+		if (matchIpToS != null) {
+			match.setNetworkTypeOfService(matchIpToS);
+			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+		}
+
+		// Match the Source TCP/UDP port
+		Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
+		if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
+			matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
+		}
+		if (matchSrcTcpUdpPort != null) {
+			match.setTransportSource(matchSrcTcpUdpPort);
+			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+		}
+
+		// Match the Destination TCP/UDP port
+		Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
+		if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
+			matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
+		}
+		if (matchDstTcpUdpPort != null) {
+			match.setTransportDestination(matchDstTcpUdpPort);
+			match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+		}
+
+		//
+		// Fetch the actions
+		//
+		Short actionOutputPort = null;
+		List<OFAction> openFlowActions = new ArrayList<OFAction>();
+		int actionsLen = 0;
+		FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
+		//
+		for (FlowEntryAction action : flowEntryActions.actions()) {
+			ActionOutput actionOutput = action.actionOutput();
+			ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+			ActionSetVlanPriority actionSetVlanPriority = action
+					.actionSetVlanPriority();
+			ActionStripVlan actionStripVlan = action.actionStripVlan();
+			ActionSetEthernetAddr actionSetEthernetSrcAddr = action
+					.actionSetEthernetSrcAddr();
+			ActionSetEthernetAddr actionSetEthernetDstAddr = action
+					.actionSetEthernetDstAddr();
+			ActionSetIPv4Addr actionSetIPv4SrcAddr = action
+					.actionSetIPv4SrcAddr();
+			ActionSetIPv4Addr actionSetIPv4DstAddr = action
+					.actionSetIPv4DstAddr();
+			ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+			ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
+					.actionSetTcpUdpSrcPort();
+			ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
+					.actionSetTcpUdpDstPort();
+			ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+			if (actionOutput != null) {
+				actionOutputPort = actionOutput.port().value();
+				// XXX: The max length is hard-coded for now
+				OFActionOutput ofa = new OFActionOutput(actionOutput.port()
+						.value(), (short) 0xffff);
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionSetVlanId != null) {
+				OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
+						actionSetVlanId.vlanId());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionSetVlanPriority != null) {
+				OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
+						actionSetVlanPriority.vlanPriority());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionStripVlan != null) {
+				if (actionStripVlan.stripVlan() == true) {
+					OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+					openFlowActions.add(ofa);
+					actionsLen += ofa.getLength();
+				}
+			}
+
+			if (actionSetEthernetSrcAddr != null) {
+				OFActionDataLayerSource ofa = new OFActionDataLayerSource(
+						actionSetEthernetSrcAddr.addr().toBytes());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionSetEthernetDstAddr != null) {
+				OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
+						actionSetEthernetDstAddr.addr().toBytes());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionSetIPv4SrcAddr != null) {
+				OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
+						actionSetIPv4SrcAddr.addr().value());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionSetIPv4DstAddr != null) {
+				OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
+						actionSetIPv4DstAddr.addr().value());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionSetIpToS != null) {
+				OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
+						actionSetIpToS.ipToS());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionSetTcpUdpSrcPort != null) {
+				OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
+						actionSetTcpUdpSrcPort.port());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionSetTcpUdpDstPort != null) {
+				OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
+						actionSetTcpUdpDstPort.port());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+
+			if (actionEnqueue != null) {
+				OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
+						.value(), actionEnqueue.queueId());
+				openFlowActions.add(ofa);
+				actionsLen += ofa.getLength();
+			}
+		}
+
+		fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
+				.setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
+				.setPriority(PRIORITY_DEFAULT)
+				.setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
+				.setCommand(flowModCommand).setMatch(match)
+				.setActions(openFlowActions)
+				.setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+		fm.setOutPort(OFPort.OFPP_NONE.getValue());
+		if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
+				|| (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+			if (actionOutputPort != null)
+				fm.setOutPort(actionOutputPort);
+		}
+
+		//
+		// TODO: Set the following flag
+		// fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+		// See method ForwardingBase::pushRoute()
+		//
+
+		//
+		// Write the message to the switch
+		//
+		log.debug("MEASUREMENT: Installing flow entry "
+				+ flowEntry.flowEntryUserState() + " into switch DPID: "
+				+ sw.getStringId() + " flowEntryId: "
+				+ flowEntry.flowEntryId().toString() + " srcMac: "
+				+ matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
+				+ matchInPort + " outPort: " + actionOutputPort);
+		
+		//
+		// TODO: We should use the OpenFlow Barrier mechanism
+		// to check for errors, and update the SwitchState
+		// for a flow entry after the Barrier message is
+		// is received.
+		//
+		// TODO: The FlowEntry Object in Titan should be set
+		// to FE_SWITCH_UPDATED.
+		//
+		
+		return add(sw,fm);
+	}
+
 	private SwitchQueue getQueue(IOFSwitch sw) {
 		if (sw == null)  {
 			return null;