Merge pull request #505 from jonohart/fw

Forwarding bug fixes
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 02972e2..133f29c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -7,7 +7,6 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import net.floodlightcontroller.core.FloodlightContext;
 import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -62,6 +61,8 @@
 
 	private final int IDLE_TIMEOUT = 5; // seconds
 	private final int HARD_TIMEOUT = 0; // seconds
+
+	private final int PATH_PUSHED_TIMEOUT = 3000; // milliseconds
 	
 	private IFloodlightProviderService floodlightProvider;
 	private IFlowService flowService;
@@ -71,12 +72,15 @@
 	private IDeviceStorage deviceStorage;
 	private TopologyManager topologyService;
 	
-	private Map<Path, Long> pendingFlows;
+	//private Map<Path, Long> pendingFlows;
+	// TODO it seems there is a Guava collection that will time out entries.
+	// We should see if this will work here.
+	private Map<Path, PushedFlow> pendingFlows;
 	private ListMultimap<Long, PacketToPush> waitingPackets;
 	
 	private final Object lock = new Object();
 	
-	public class PacketToPush {
+	private class PacketToPush {
 		public final OFPacketOut packet;
 		public final long dpid;
 		
@@ -86,15 +90,35 @@
 		}
 	}
 	
-	public final class Path {
+	private class PushedFlow {
+		public final long flowId;
+		private final long pushedTime;
+		public short firstHopOutPort = OFPort.OFPP_NONE.getValue();
+		
+		public PushedFlow(long flowId) {
+			this.flowId = flowId;
+			pushedTime = System.currentTimeMillis();
+		}
+		
+		public boolean isExpired() {
+			return (System.currentTimeMillis() - pushedTime) > PATH_PUSHED_TIMEOUT;
+		}
+	}
+	
+	private final class Path {
 		public final SwitchPort srcPort;
 		public final SwitchPort dstPort;
+		public final MACAddress srcMac;
+		public final MACAddress dstMac;
 		
-		public Path(SwitchPort src, SwitchPort dst) {
+		public Path(SwitchPort src, SwitchPort dst, 
+				MACAddress srcMac, MACAddress dstMac) {
 			srcPort = new SwitchPort(new Dpid(src.dpid().value()), 
 					new Port(src.port().value()));
 			dstPort = new SwitchPort(new Dpid(dst.dpid().value()), 
 					new Port(dst.port().value()));
+			this.srcMac = srcMac;
+			this.dstMac = dstMac;
 		}
 		
 		@Override
@@ -105,7 +129,9 @@
 			
 			Path otherPath = (Path) other;
 			return srcPort.equals(otherPath.srcPort) && 
-					dstPort.equals(otherPath.dstPort);
+					dstPort.equals(otherPath.dstPort) &&
+					srcMac.equals(otherPath.srcMac) &&
+					dstMac.equals(otherPath.dstMac);
 		}
 		
 		@Override
@@ -113,8 +139,16 @@
 			int hash = 17;
 			hash = 31 * hash + srcPort.hashCode();
 			hash = 31 * hash + dstPort.hashCode();
+			hash = 31 * hash + srcMac.hashCode();
+			hash = 31 * hash + dstMac.hashCode();
 			return hash;
 		}
+		
+		@Override
+		public String toString() {
+			return "(" + srcMac + " at " + srcPort + ") => (" 
+					+ dstPort + " at " + dstMac + ")";
+		}
 	}
 	
 	@Override
@@ -154,7 +188,8 @@
 		
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 		
-		pendingFlows = new ConcurrentHashMap<Path, Long>();
+		//pendingFlows = new ConcurrentHashMap<Path, Long>();
+		pendingFlows = new HashMap<Path, PushedFlow>();
 		//waitingPackets = Multimaps.synchronizedSetMultimap(
 				//HashMultimap.<Long, PacketToPush>create());
 		//waitingPackets = HashMultimap.create();
@@ -280,20 +315,35 @@
 		
 		FlowPath flowPath, reverseFlowPath;
 		
-		Path pathspec = new Path(srcSwitchPort, dstSwitchPort);
+		Path pathspec = new Path(srcSwitchPort, dstSwitchPort, 
+				srcMacAddress, dstMacAddress);
 		// TODO check concurrency
 		synchronized (lock) {
-			Long existingFlowId = pendingFlows.get(pathspec);
+			PushedFlow existingFlow = pendingFlows.get(pathspec);
+			//Long existingFlowId = pendingFlows.get(pathspec);
 			
-			if (existingFlowId != null) {
+			if (existingFlow != null && !existingFlow.isExpired()) {
 				log.debug("Found existing flow {}", 
-						HexString.toHexString(existingFlowId));
+						HexString.toHexString(existingFlow.flowId));
 				
 				OFPacketOut po = constructPacketOut(pi, sw);
-				waitingPackets.put(existingFlowId, new PacketToPush(po, sw.getId()));
+				
+				if (existingFlow.firstHopOutPort != OFPort.OFPP_NONE.getValue()) {
+					// Flow has been sent to the switches so it is safe to
+					// send a packet out now
+					sendPacketOut(sw, po, existingFlow.firstHopOutPort);
+				}
+				else {
+					// Flow has not yet been sent to switches so save the
+					// packet out for later
+					waitingPackets.put(existingFlow.flowId, 
+							new PacketToPush(po, sw.getId()));
+				}
 				return;
 			}
 			
+			//log.debug("Couldn't match {} in {}", pathspec, pendingFlows);
+			
 			log.debug("Adding new flow between {} at {} and {} at {}",
 					new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
 			
@@ -347,11 +397,14 @@
 			reverseFlowPath.setFlowId(reverseFlowId);
 			
 			OFPacketOut po = constructPacketOut(pi, sw);
-			Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort);
+			Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort, 
+					dstMacAddress, srcMacAddress);
 			
 			// Add to waiting lists
-			pendingFlows.put(pathspec, flowId.value());
-			pendingFlows.put(reversePathSpec, reverseFlowId.value());
+			//pendingFlows.put(pathspec, flowId.value());
+			//pendingFlows.put(reversePathSpec, reverseFlowId.value());
+			pendingFlows.put(pathspec, new PushedFlow(flowId.value()));
+			pendingFlows.put(reversePathSpec, new PushedFlow(reverseFlowId.value()));
 			waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
 		
 		}
@@ -416,33 +469,42 @@
 	}
 
 	private void flowInstalled(FlowPath installedFlowPath) {
-		// TODO check concurrency
-		// will need to sync and access both collections at once.
 		long flowId = installedFlowPath.flowId().value();
 		
+		short outPort = 
+				installedFlowPath.flowEntries().get(0).outPort().value();
+		
+		MACAddress srcMacAddress = installedFlowPath.flowEntryMatch().srcMac();
+		MACAddress dstMacAddress = installedFlowPath.flowEntryMatch().dstMac();
+		
 		Collection<PacketToPush> packets;
 		synchronized (lock) {
+			log.debug("Flow {} has been installed, sending queued packets",
+					installedFlowPath.flowId());
+			
 			packets = waitingPackets.removeAll(flowId);
 			
-			//remove pending flows entry
-			Path pathToRemove = new Path(installedFlowPath.dataPath().srcPort(),
-					installedFlowPath.dataPath().dstPort());
-			pendingFlows.remove(pathToRemove);
-			
+			// remove pending flows entry
+			Path installedPath = new Path(installedFlowPath.dataPath().srcPort(),
+					installedFlowPath.dataPath().dstPort(),
+					srcMacAddress, dstMacAddress);
+			//pendingFlows.remove(pathToRemove);
+			pendingFlows.get(installedPath).firstHopOutPort = outPort;
 		}
 		
 		for (PacketToPush packet : packets) {
 			IOFSwitch sw = floodlightProvider.getSwitches().get(packet.dpid);
 			
-			OFPacketOut po = packet.packet;
-			short outPort = 
-					installedFlowPath.flowEntries().get(0).outPort().value();
-			po.getActions().add(new OFActionOutput(outPort));
-			po.setActionsLength((short)
-					(po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
-			po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
-			
-			flowPusher.add(sw, po);
+			sendPacketOut(sw, packet.packet, outPort);
 		}
 	}
+	
+	private void sendPacketOut(IOFSwitch sw, OFPacketOut po, short outPort) {
+		po.getActions().add(new OFActionOutput(outPort));
+		po.setActionsLength((short)
+				(po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
+		po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
+		
+		flowPusher.add(sw, po);
+	}
 }