Merge branch 'master' into fw
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 8b1f7c0..68cd844 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -273,6 +273,10 @@
 
 	for (FlowPath flowPath : flowPaths) {
 	    boolean isInstalled = true;
+	    
+	    if (flowPath.flowEntries().isEmpty()) {
+	    	continue;
+	    }
 
 	    //
 	    // Check whether all Flow Entries have been installed
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index dd98f4e..a5717e6 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -400,6 +400,11 @@
 	if (srcDpid.value() != sw.getId())
 	    return;
 	deleteFlow(flowPath.flowId());
+	
+	// Send flow deleted notification to the Forwarding module
+	// TODO This is a quick fix for flow-removed notifications. We
+	// should think more about the design of these notifications.
+	notificationFlowPathRemoved(flowPath);
     }
 
     /**
@@ -469,6 +474,20 @@
     }
 
     /**
+     * Generate a notification that a FlowPath has been removed from the 
+     * network. This means we've received an expiry message for the flow
+     * from the switch, and send flowmods to remove any remaining parts of
+     * the path.
+     * 
+     * @param flowPath FlowPath object that was removed from the network.
+     */
+    void notificationFlowPathRemoved(FlowPath flowPath) {
+	if (forwardingService != null) {
+		forwardingService.flowRemoved(flowPath);
+	}
+    }
+
+    /**
      * Push modified Flow-related state as appropriate.
      *
      * @param modifiedFlowPaths the collection of modified Flow Paths.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 6d953f1..bb92cf9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -25,12 +25,13 @@
 import net.onrc.onos.ofcontroller.devicemanager.IOnosDeviceService;
 import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
 import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
-import net.onrc.onos.ofcontroller.proxyarp.IProxyArpService;
 import net.onrc.onos.ofcontroller.proxyarp.BroadcastPacketOutNotification;
+import net.onrc.onos.ofcontroller.proxyarp.IProxyArpService;
 import net.onrc.onos.ofcontroller.topology.TopologyManager;
 import net.onrc.onos.ofcontroller.util.CallerId;
 import net.onrc.onos.ofcontroller.util.DataPath;
 import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
 import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
 import net.onrc.onos.ofcontroller.util.FlowId;
 import net.onrc.onos.ofcontroller.util.FlowPath;
@@ -59,8 +60,8 @@
 
 	private final int IDLE_TIMEOUT = 5; // seconds
 	private final int HARD_TIMEOUT = 0; // seconds
-
-	private final int PATH_PUSHED_TIMEOUT = 3000; // milliseconds
+	
+	private final CallerId callerId = new CallerId("Forwarding");
 	
 	private IFloodlightProviderService floodlightProvider;
 	private IFlowService flowService;
@@ -70,7 +71,6 @@
 	private IDeviceStorage deviceStorage;
 	private TopologyManager topologyService;
 	
-	//private Map<Path, Long> pendingFlows;
 	// TODO it seems there is a Guava collection that will time out entries.
 	// We should see if this will work here.
 	private Map<Path, PushedFlow> pendingFlows;
@@ -90,31 +90,18 @@
 	
 	private class PushedFlow {
 		public final long flowId;
-		private final long pushedTime;
-		public short firstHopOutPort = OFPort.OFPP_NONE.getValue();
+		public boolean installed = false;
 		
 		public PushedFlow(long flowId) {
 			this.flowId = flowId;
-			pushedTime = System.currentTimeMillis();
-		}
-		
-		public boolean isExpired() {
-			return (System.currentTimeMillis() - pushedTime) > PATH_PUSHED_TIMEOUT;
 		}
 	}
 	
 	private final class Path {
-		public final SwitchPort srcPort;
-		public final SwitchPort dstPort;
 		public final MACAddress srcMac;
 		public final MACAddress dstMac;
 		
-		public Path(SwitchPort src, SwitchPort dst, 
-				MACAddress srcMac, MACAddress dstMac) {
-			srcPort = new SwitchPort(new Dpid(src.dpid().value()), 
-					new Port(src.port().value()));
-			dstPort = new SwitchPort(new Dpid(dst.dpid().value()), 
-					new Port(dst.port().value()));
+		public Path(MACAddress srcMac, MACAddress dstMac) {
 			this.srcMac = srcMac;
 			this.dstMac = dstMac;
 		}
@@ -126,17 +113,13 @@
 			}
 			
 			Path otherPath = (Path) other;
-			return srcPort.equals(otherPath.srcPort) && 
-					dstPort.equals(otherPath.dstPort) &&
-					srcMac.equals(otherPath.srcMac) &&
+			return srcMac.equals(otherPath.srcMac) &&
 					dstMac.equals(otherPath.dstMac);
 		}
 		
 		@Override
 		public int hashCode() {
 			int hash = 17;
-			hash = 31 * hash + srcPort.hashCode();
-			hash = 31 * hash + dstPort.hashCode();
 			hash = 31 * hash + srcMac.hashCode();
 			hash = 31 * hash + dstMac.hashCode();
 			return hash;
@@ -144,8 +127,7 @@
 		
 		@Override
 		public String toString() {
-			return "(" + srcMac + " at " + srcPort + ") => (" 
-					+ dstPort + " at " + dstMac + ")";
+			return "(" + srcMac + ") => (" + dstMac + ")";
 		}
 	}
 	
@@ -188,12 +170,8 @@
 		datagrid = context.getServiceImpl(IDatagridService.class);
 		
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
-		
-		//pendingFlows = new ConcurrentHashMap<Path, Long>();
+
 		pendingFlows = new HashMap<Path, PushedFlow>();
-		//waitingPackets = Multimaps.synchronizedSetMultimap(
-				//HashMultimap.<Long, PacketToPush>create());
-		//waitingPackets = HashMultimap.create();
 		waitingPackets = LinkedListMultimap.create();
 		
 		deviceStorage = new DeviceStorageImpl();
@@ -243,7 +221,6 @@
 		
 		if (eth.isBroadcast() || eth.isMulticast()) {
 			handleBroadcast(sw, pi, eth);
-			//return Command.CONTINUE;
 		}
 		else {
 			// Unicast
@@ -297,26 +274,47 @@
 		MACAddress srcMacAddress = MACAddress.valueOf(eth.getSourceMACAddress());
 		MACAddress dstMacAddress = MACAddress.valueOf(eth.getDestinationMACAddress());
 		
-		
 		FlowPath flowPath, reverseFlowPath;
 		
-		Path pathspec = new Path(srcSwitchPort, dstSwitchPort, 
-				srcMacAddress, dstMacAddress);
+		Path pathspec = new Path(srcMacAddress, dstMacAddress);
 		// TODO check concurrency
 		synchronized (lock) {
 			PushedFlow existingFlow = pendingFlows.get(pathspec);
-			//Long existingFlowId = pendingFlows.get(pathspec);
-			
-			if (existingFlow != null && !existingFlow.isExpired()) {
+
+			if (existingFlow != null) {
+				// We've already installed a flow for this pair of MAC addresses
 				log.debug("Found existing flow {}", 
 						HexString.toHexString(existingFlow.flowId));
 				
 				OFPacketOut po = constructPacketOut(pi, sw);
 				
-				if (existingFlow.firstHopOutPort != OFPort.OFPP_NONE.getValue()) {
+				// Find the correct port here. We just assume the PI is from 
+				// the first hop switch, but this is definitely not always
+				// the case. We'll have to retrieve the flow from HZ every time
+				// because it could change (be rerouted) sometimes.
+				if (existingFlow.installed) {
 					// Flow has been sent to the switches so it is safe to
 					// send a packet out now
-					sendPacketOut(sw, po, existingFlow.firstHopOutPort);
+					FlowPath flow = datagrid.getFlow(new FlowId(existingFlow.flowId));
+					FlowEntry flowEntryForThisSwitch = null;
+					for (FlowEntry flowEntry : flow.flowEntries()) {
+						if (flowEntry.dpid().equals(new Dpid(sw.getId()))) {
+							flowEntryForThisSwitch = flowEntry;
+							break;
+						}
+					}
+					
+					if (flowEntryForThisSwitch == null) {
+						// If we don't find a flow entry for that switch, then we're
+						// in the middle of a rerouting (or something's gone wrong). 
+						// This packet will be dropped as a victim of the rerouting.
+						log.debug("Dropping packet on flow {} between {}-{}, flow path {}",
+								new Object[] {new FlowId(existingFlow.flowId),
+								srcMacAddress, dstMacAddress, flow});
+					}
+					else {
+						sendPacketOut(sw, po, flowEntryForThisSwitch.outPort().value());
+					}
 				}
 				else {
 					// Flow has not yet been sent to switches so save the
@@ -326,21 +324,16 @@
 				}
 				return;
 			}
-			
-			//log.debug("Couldn't match {} in {}", pathspec, pendingFlows);
-			
+
 			log.debug("Adding new flow between {} at {} and {} at {}",
 					new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
 			
-			
-			CallerId callerId = new CallerId("Forwarding");
-			
 			DataPath datapath = new DataPath();
 			datapath.setSrcPort(srcSwitchPort);
 			datapath.setDstPort(dstSwitchPort);
 			
 			flowPath = new FlowPath();
-			flowPath.setInstallerId(callerId);
+			flowPath.setInstallerId(new CallerId(callerId));
 	
 			flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
 			flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
@@ -360,7 +353,7 @@
 			
 			// TODO implement copy constructor for FlowPath
 			reverseFlowPath = new FlowPath();
-			reverseFlowPath.setInstallerId(callerId);
+			reverseFlowPath.setInstallerId(new CallerId(callerId));
 			reverseFlowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
 			reverseFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
 			reverseFlowPath.setIdleTimeout(IDLE_TIMEOUT);
@@ -372,9 +365,7 @@
 			reverseFlowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
 			reverseFlowPath.setDataPath(reverseDataPath);
 			reverseFlowPath.dataPath().srcPort().dpid().toString();
-			
-			// TODO what happens if no path exists? cleanup
-			
+
 			FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
 			FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
 			
@@ -382,50 +373,23 @@
 			reverseFlowPath.setFlowId(reverseFlowId);
 			
 			OFPacketOut po = constructPacketOut(pi, sw);
-			Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort, 
-					dstMacAddress, srcMacAddress);
+			Path reversePathSpec = new Path(dstMacAddress, srcMacAddress);
 			
 			// Add to waiting lists
-			//pendingFlows.put(pathspec, flowId.value());
-			//pendingFlows.put(reversePathSpec, reverseFlowId.value());
 			pendingFlows.put(pathspec, new PushedFlow(flowId.value()));
 			pendingFlows.put(reversePathSpec, new PushedFlow(reverseFlowId.value()));
 			waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
 		
 		}
 		
+		log.debug("Adding reverse {} to {} flowid {}", new Object[] {
+				dstMacAddress, srcMacAddress, reverseFlowPath.flowId()});
 		flowService.addFlow(reverseFlowPath);
+		log.debug("Adding forward {} to {} flowid {}", new Object[] {
+				srcMacAddress, dstMacAddress, flowPath.flowId()});
 		flowService.addFlow(flowPath);
 		
 	}
-	
-	/*
-	private boolean flowExists(SwitchPort srcPort, MACAddress srcMac, 
-			SwitchPort dstPort, MACAddress dstMac) {
-		for (FlowPath flow : datagridService.getAllFlows()) {
-			FlowEntryMatch match = flow.flowEntryMatch();
-			// TODO implement FlowEntryMatch.equals();
-			// This is painful to do properly without support in the FlowEntryMatch
-			boolean same = true;
-			if (!match.srcMac().equals(srcMac) ||
-				!match.dstMac().equals(dstMac)) {
-				same = false;
-			}
-			if (!flow.dataPath().srcPort().equals(srcPort) || 
-				!flow.dataPath().dstPort().equals(dstPort)) {
-				same = false;
-			}
-			
-			if (same) {
-				log.debug("found flow entry that's the same {}-{}:::{}-{}",
-						new Object[] {srcPort, srcMac, dstPort, dstMac});
-				return true;
-			}
-		}
-		
-		return false;
-	}
-	*/
 
 	private OFPacketOut constructPacketOut(OFPacketIn pi, IOFSwitch sw) {	
 		OFPacketOut po = new OFPacketOut();
@@ -452,36 +416,64 @@
 			flowInstalled(flowPath);
 		}
 	}
+	
+	@Override
+	public void flowRemoved(FlowPath removedFlowPath) {
+		if (!removedFlowPath.installerId().equals(callerId)) {
+			// Not our flow path, ignore
+			return;
+		}
+		
+		MACAddress srcMacAddress = removedFlowPath.flowEntryMatch().srcMac();
+		MACAddress dstMacAddress = removedFlowPath.flowEntryMatch().dstMac();
+		
+		Path removedPath = new Path(srcMacAddress, dstMacAddress);
+		
+		synchronized (lock) {
+			pendingFlows.remove(removedPath);
+			
+			// There *shouldn't* be any packets queued if the flow has 
+			// just been removed. 
+			List<PacketToPush> packets = 
+					waitingPackets.removeAll(removedFlowPath.flowId().value());
+			if (!packets.isEmpty()) {
+				log.warn("Removed flow {} has packets queued", 
+						removedFlowPath.flowId());
+			}
+		}
+	}
 
 	private void flowInstalled(FlowPath installedFlowPath) {
 		long flowId = installedFlowPath.flowId().value();
 		
+		if (!installedFlowPath.installerId().equals(callerId)) {
+			// Not our flow path, ignore
+			return;
+		}
+		
+		// TODO waiting packets should time out. We could request a path that
+		// can't be installed right now because of a network partition. The path
+		// may eventually be installed, but we may have received thousands of 
+		// packets in the meantime and probably don't want to send very old packets.
 		short outPort = 
 				installedFlowPath.flowEntries().get(0).outPort().value();
 		
 		MACAddress srcMacAddress = installedFlowPath.flowEntryMatch().srcMac();
 		MACAddress dstMacAddress = installedFlowPath.flowEntryMatch().dstMac();
 		
-		if (srcMacAddress == null || dstMacAddress == null) {
-			// Not our flow path, ignore
-			return;
-		}
-		
 		Collection<PacketToPush> packets;
 		synchronized (lock) {
-			log.debug("Flow {} has been installed, sending queued packets",
-					installedFlowPath.flowId());
-			
 			packets = waitingPackets.removeAll(flowId);
 			
+			log.debug("Flow {} has been installed, sending {} queued packets",
+					installedFlowPath.flowId(), packets.size());
+			
 			// remove pending flows entry
-			Path installedPath = new Path(installedFlowPath.dataPath().srcPort(),
-					installedFlowPath.dataPath().dstPort(),
-					srcMacAddress, dstMacAddress);
-			//pendingFlows.remove(pathToRemove);
+			Path installedPath = new Path(srcMacAddress, dstMacAddress);
 			PushedFlow existingFlow = pendingFlows.get(installedPath);
-			if (existingFlow != null)
-			    existingFlow.firstHopOutPort = outPort;
+			if (existingFlow != null) {
+			    existingFlow.installed = true;
+			}
 		}
 		
 		for (PacketToPush packet : packets) {
@@ -499,4 +491,5 @@
 		
 		flowPusher.add(sw, po);
 	}
+
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
index e5bd714..0e0d1da 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
@@ -22,4 +22,12 @@
 	 * been installed in the network.
 	 */
 	public void flowsInstalled(Collection<FlowPath> installedFlowPaths);
+	
+	/**
+	 * Notify the Forwarding module that a flow has expired and been 
+	 * removed from the network.
+	 * 
+	 * @param removedFlowPath The FlowPath that was removed
+	 */
+	public void flowRemoved(FlowPath removedFlowPath);
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/CallerId.java b/src/main/java/net/onrc/onos/ofcontroller/util/CallerId.java
index 0607533..ec18f09 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/CallerId.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/CallerId.java
@@ -12,6 +12,14 @@
      * Default constructor.
      */
     public CallerId() {}
+    
+    /**
+     * Copy constructor
+     * @param otherCallerId
+     */
+    public CallerId(CallerId otherCallerId) {
+    value = otherCallerId.value;
+    }
 
     /**
      * Constructor from a string value.
@@ -49,4 +57,20 @@
     public String toString() {
 	return value;
     }
+    
+    @Override
+    public boolean equals(Object other) {
+    if (!(other instanceof CallerId)) {
+        return false;
+    }
+    
+    CallerId otherCallerId = (CallerId) other;
+    
+    return value.equals(otherCallerId.value);
+    }
+    
+    @Override
+    public int hashCode() {
+    return value.hashCode();
+    }
 }