Merge in Pavlin's changes from master
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 3725d5e..c70d4db 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -1,6 +1,5 @@
 package net.onrc.onos.ofcontroller.forwarding;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -18,13 +17,13 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.floodlightcontroller.packet.Ethernet;
 import net.floodlightcontroller.util.MACAddress;
-import net.onrc.onos.datagrid.IDatagridService;
 import net.onrc.onos.ofcontroller.core.IDeviceStorage;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IDeviceObject;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
 import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
 import net.onrc.onos.ofcontroller.core.internal.DeviceStorageImpl;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
 import net.onrc.onos.ofcontroller.util.CallerId;
 import net.onrc.onos.ofcontroller.util.DataPath;
@@ -50,16 +49,17 @@
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
 
 public class Forwarding implements IOFMessageListener, IFloodlightModule,
 									IForwardingService {
 	private final static Logger log = LoggerFactory.getLogger(Forwarding.class);
 
+	private final int IDLE_TIMEOUT = 5; // seconds
+	private final int HARD_TIMEOUT = 0; // seconds
+	
 	private IFloodlightProviderService floodlightProvider;
 	private IFlowService flowService;
-	@SuppressWarnings("unused")
-	private IDatagridService datagridService;
+	private IFlowPusherService flowPusher;
 	
 	private IDeviceStorage deviceStorage;
 	private TopologyManager topologyService;
@@ -67,6 +67,8 @@
 	private Map<Path, Long> pendingFlows;
 	private Multimap<Long, PacketToPush> waitingPackets;
 	
+	private final Object lock = new Object();
+	
 	public class PacketToPush {
 		public final OFPacketOut packet;
 		public final long dpid;
@@ -130,22 +132,23 @@
 				new ArrayList<Class<? extends IFloodlightService>>();
 		dependencies.add(IFloodlightProviderService.class);
 		dependencies.add(IFlowService.class);
-		dependencies.add(IDatagridService.class);
+		dependencies.add(IFlowPusherService.class);
 		return dependencies;
 	}
 	
 	@Override
 	public void init(FloodlightModuleContext context) {
-		this.floodlightProvider = 
+		floodlightProvider = 
 				context.getServiceImpl(IFloodlightProviderService.class);
-		this.flowService = context.getServiceImpl(IFlowService.class);
-		this.datagridService = context.getServiceImpl(IDatagridService.class);
+		flowService = context.getServiceImpl(IFlowService.class);
+		flowPusher = context.getServiceImpl(IFlowPusherService.class);
 		
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 		
 		pendingFlows = new ConcurrentHashMap<Path, Long>();
-		waitingPackets = Multimaps.synchronizedSetMultimap(
-				HashMultimap.<Long, PacketToPush>create());
+		//waitingPackets = Multimaps.synchronizedSetMultimap(
+				//HashMultimap.<Long, PacketToPush>create());
+		waitingPackets = HashMultimap.create();
 		
 		deviceStorage = new DeviceStorageImpl();
 		deviceStorage.init("");
@@ -231,84 +234,87 @@
 		MACAddress dstMacAddress = MACAddress.valueOf(eth.getDestinationMACAddress());
 		
 		
-		DataPath datapath = new DataPath();
-		datapath.setSrcPort(srcSwitchPort);
-		datapath.setDstPort(dstSwitchPort);
-		
-		
+		FlowPath flowPath, reverseFlowPath;
 		
 		Path pathspec = new Path(srcSwitchPort, dstSwitchPort);
 		// TODO check concurrency
-		Long existingFlowId = pendingFlows.get(pathspec);
-		
-		if (existingFlowId != null) {
-			log.debug("Found existing flow {}", 
-					HexString.toHexString(existingFlowId));
+		synchronized (lock) {
+			Long existingFlowId = pendingFlows.get(pathspec);
 			
-			// TODO do stuff.
-			OFPacketOut po = constructPacketOut(datapath, pi, sw);
-			waitingPackets.put(existingFlowId, new PacketToPush(po, sw.getId()));
-			return;
+			if (existingFlowId != null) {
+				log.debug("Found existing flow {}", 
+						HexString.toHexString(existingFlowId));
+				
+				OFPacketOut po = constructPacketOut(pi, sw);
+				waitingPackets.put(existingFlowId, new PacketToPush(po, sw.getId()));
+				return;
+			}
+			
+			log.debug("Adding new flow between {} at {} and {} at {}",
+					new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
+			
+			
+			CallerId callerId = new CallerId("Forwarding");
+			
+			DataPath datapath = new DataPath();
+			datapath.setSrcPort(srcSwitchPort);
+			datapath.setDstPort(dstSwitchPort);
+			
+			flowPath = new FlowPath();
+			flowPath.setInstallerId(callerId);
+	
+			flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
+			flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
+			flowPath.setFlowEntryMatch(new FlowEntryMatch());
+			flowPath.setIdleTimeout(IDLE_TIMEOUT);
+			flowPath.setHardTimeout(HARD_TIMEOUT);
+			flowPath.flowEntryMatch().enableSrcMac(srcMacAddress);
+			flowPath.flowEntryMatch().enableDstMac(dstMacAddress);
+			flowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
+			flowPath.setDataPath(datapath);
+			
+			
+			DataPath reverseDataPath = new DataPath();
+			// Reverse the ports for the reverse path
+			reverseDataPath.setSrcPort(dstSwitchPort);
+			reverseDataPath.setDstPort(srcSwitchPort);
+			
+			// TODO implement copy constructor for FlowPath
+			reverseFlowPath = new FlowPath();
+			reverseFlowPath.setInstallerId(callerId);
+			reverseFlowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
+			reverseFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
+			reverseFlowPath.setIdleTimeout(IDLE_TIMEOUT);
+			reverseFlowPath.setHardTimeout(HARD_TIMEOUT);
+			reverseFlowPath.setFlowEntryMatch(new FlowEntryMatch());
+			// Reverse the MAC addresses for the reverse path
+			reverseFlowPath.flowEntryMatch().enableSrcMac(dstMacAddress);
+			reverseFlowPath.flowEntryMatch().enableDstMac(srcMacAddress);
+			reverseFlowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
+			reverseFlowPath.setDataPath(reverseDataPath);
+			reverseFlowPath.dataPath().srcPort().dpid().toString();
+			
+			// TODO what happens if no path exists? cleanup
+			
+			FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
+			FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
+			
+			flowPath.setFlowId(flowId);
+			reverseFlowPath.setFlowId(reverseFlowId);
+			
+			OFPacketOut po = constructPacketOut(pi, sw);
+			Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort);
+			
+			// Add to waiting lists
+			pendingFlows.put(pathspec, flowId.value());
+			pendingFlows.put(reversePathSpec, reverseFlowId.value());
+			waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
+		
 		}
 		
-		
-		log.debug("Adding new flow between {} at {} and {} at {}",
-				new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
-		
-		
-		CallerId callerId = new CallerId("Forwarding");
-		
-		//FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
-		FlowPath flowPath = new FlowPath();
-		//flowPath.setFlowId(flowId);
-		flowPath.setInstallerId(callerId);
-
-		flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
-		flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
-		flowPath.setFlowEntryMatch(new FlowEntryMatch());
-		flowPath.flowEntryMatch().enableSrcMac(srcMacAddress);
-		flowPath.flowEntryMatch().enableDstMac(dstMacAddress);
-		// For now just forward IPv4 packets. This prevents accidentally
-		// forwarding other stuff like ARP.
-		flowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
-		flowPath.setDataPath(datapath);
-			
-		DataPath reverseDataPath = new DataPath();
-		// Reverse the ports for the reverse path
-		reverseDataPath.setSrcPort(dstSwitchPort);
-		reverseDataPath.setDstPort(srcSwitchPort);
-		
-		// TODO implement copy constructor for FlowPath
-		FlowPath reverseFlowPath = new FlowPath();
-		//reverseFlowPath.setFlowId(reverseFlowId);
-		reverseFlowPath.setInstallerId(callerId);
-		reverseFlowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
-		reverseFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
-		reverseFlowPath.setFlowEntryMatch(new FlowEntryMatch());
-		// Reverse the MAC addresses for the reverse path
-		reverseFlowPath.flowEntryMatch().enableSrcMac(dstMacAddress);
-		reverseFlowPath.flowEntryMatch().enableDstMac(srcMacAddress);
-		reverseFlowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
-		reverseFlowPath.setDataPath(reverseDataPath);
-		reverseFlowPath.dataPath().srcPort().dpid().toString();
-		
-		// TODO what happens if no path exists?
-		FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
-		FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
-		
-		flowPath.setFlowId(flowId);
-		reverseFlowPath.setFlowId(reverseFlowId);
-		
-		OFPacketOut po = constructPacketOut(datapath, pi, sw);
-		Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort);
-		
-		// Add to waiting lists
-		pendingFlows.put(pathspec, flowId.value());
-		pendingFlows.put(reversePathSpec, reverseFlowId.value());
-		waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
-		
 		flowService.addFlow(reverseFlowPath);
 		flowService.addFlow(flowPath);
+		
 	}
 	
 	/*
@@ -339,11 +345,7 @@
 	}
 	*/
 
-	private OFPacketOut constructPacketOut(DataPath datapath, OFPacketIn pi, 
-			IOFSwitch sw) {
-		//List<OFAction> actions = new ArrayList<OFAction>(1);
-		//actions.add(new OFActionOutput(port));
-		
+	private OFPacketOut constructPacketOut(OFPacketIn pi, IOFSwitch sw) {	
 		OFPacketOut po = new OFPacketOut();
 		po.setInPort(OFPort.OFPP_NONE)
 		.setInPort(pi.getInPort())
@@ -373,12 +375,17 @@
 		// TODO check concurrency
 		// will need to sync and access both collections at once.
 		long flowId = installedFlowPath.flowId().value();
-		Collection<PacketToPush> packets = waitingPackets.removeAll(flowId);
 		
-		//remove pending flows entry
-		Path pathToRemove = new Path(installedFlowPath.dataPath().srcPort(),
-				installedFlowPath.dataPath().dstPort());
-		pendingFlows.remove(pathToRemove);
+		Collection<PacketToPush> packets;
+		synchronized (lock) {
+			packets = waitingPackets.removeAll(flowId);
+			
+			//remove pending flows entry
+			Path pathToRemove = new Path(installedFlowPath.dataPath().srcPort(),
+					installedFlowPath.dataPath().dstPort());
+			pendingFlows.remove(pathToRemove);
+			
+		}
 		
 		for (PacketToPush packet : packets) {
 			IOFSwitch sw = floodlightProvider.getSwitches().get(packet.dpid);
@@ -391,13 +398,7 @@
 					(po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
 			po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
 			
-			try {
-				sw.write(packet.packet, null);
-				sw.flush();
-			} catch (IOException e) {
-				log.error("Error writing packet out to switch {}:", 
-						sw.getId(), e);
-			}
+			flowPusher.add(sw, po);
 		}
 	}
 }