Merge pull request #475 from jonohart/fw

Implemented the bulk of the changes to the Forwarding module
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index f33f986..6918421 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -3,9 +3,11 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import net.floodlightcontroller.core.FloodlightContext;
 import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -46,7 +48,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class Forwarding implements IOFMessageListener, IFloodlightModule {
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+public class Forwarding implements IOFMessageListener, IFloodlightModule,
+									IForwardingService {
 	private final static Logger log = LoggerFactory.getLogger(Forwarding.class);
 
 	private IFloodlightProviderService floodlightProvider;
@@ -56,18 +63,64 @@
 	private IDeviceStorage deviceStorage;
 	private TopologyManager topologyService;
 	
-	public Forwarding() {
+	private Map<Path, Long> pendingFlows;
+	private Multimap<Long, PacketToPush> waitingPackets;
+	
+	public class PacketToPush {
+		public final OFPacketOut packet;
+		public final long dpid;
 		
+		public PacketToPush(OFPacketOut packet, long dpid) {
+			this.packet = packet;
+			this.dpid = dpid;
+		}
+	}
+	
+	public final class Path {
+		public final SwitchPort srcPort;
+		public final SwitchPort dstPort;
+		
+		public Path(SwitchPort src, SwitchPort dst) {
+			srcPort = new SwitchPort(new Dpid(src.dpid().value()), 
+					new Port(src.port().value()));
+			dstPort = new SwitchPort(new Dpid(dst.dpid().value()), 
+					new Port(dst.port().value()));
+		}
+		
+		@Override
+		public boolean equals(Object other) {
+			if (!(other instanceof Path)) {
+				return false;
+			}
+			
+			Path otherPath = (Path) other;
+			return srcPort.equals(otherPath.srcPort) && 
+					dstPort.equals(otherPath.dstPort);
+		}
+		
+		@Override
+		public int hashCode() {
+			int hash = 17;
+			hash = 31 * hash + srcPort.hashCode();
+			hash = 31 * hash + dstPort.hashCode();
+			return hash;
+		}
 	}
 	
 	@Override
 	public Collection<Class<? extends IFloodlightService>> getModuleServices() {
-		return null;
+		List<Class<? extends IFloodlightService>> services = 
+				new ArrayList<Class<? extends IFloodlightService>>(1);
+		services.add(IForwardingService.class);
+		return services;
 	}
 
 	@Override
 	public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
-		return null;
+		Map<Class<? extends IFloodlightService>, IFloodlightService> impls = 
+				new HashMap<Class<? extends IFloodlightService>, IFloodlightService>(1);
+		impls.put(IForwardingService.class, this);
+		return impls;
 	}
 
 	@Override
@@ -89,6 +142,10 @@
 		
 		floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
 		
+		pendingFlows = new ConcurrentHashMap<Path, Long>();
+		waitingPackets = Multimaps.synchronizedSetMultimap(
+				HashMultimap.<Long, PacketToPush>create());
+		
 		deviceStorage = new DeviceStorageImpl();
 		deviceStorage.init("");
 		topologyService = new TopologyManager();
@@ -141,7 +198,8 @@
 	}
 	
 	private void handlePacketIn(IOFSwitch sw, OFPacketIn pi, Ethernet eth) {
-		String destinationMac = HexString.toHexString(eth.getDestinationMACAddress()); 
+		String destinationMac = 
+				HexString.toHexString(eth.getDestinationMACAddress()); 
 		
 		IDeviceObject deviceObject = deviceStorage.getDeviceByMac(
 				destinationMac);
@@ -151,7 +209,7 @@
 			return;
 		}
 		
-		Iterator<IPortObject> ports = deviceObject.getAttachedPorts().iterator();
+		Iterator<IPortObject> ports = deviceObject.getAttachedPorts().iterator();	
 		if (!ports.hasNext()) {
 			log.debug("No attachment point found for device {}", destinationMac);
 			return;
@@ -171,46 +229,32 @@
 		MACAddress srcMacAddress = MACAddress.valueOf(eth.getSourceMACAddress());
 		MACAddress dstMacAddress = MACAddress.valueOf(eth.getDestinationMACAddress());
 		
-		if (flowExists(srcSwitchPort, srcMacAddress, 
-				dstSwitchPort, dstMacAddress)) {
-			log.debug("Not adding flow because it already exists");
+		
+		DataPath datapath = new DataPath();
+		datapath.setSrcPort(srcSwitchPort);
+		datapath.setDstPort(dstSwitchPort);
+		
+		
+		
+		Path pathspec = new Path(srcSwitchPort, dstSwitchPort);
+		// TODO check concurrency
+		Long existingFlowId = pendingFlows.get(pathspec);
+		
+		if (existingFlowId != null) {
+			log.debug("Found existing flow {}", 
+					HexString.toHexString(existingFlowId));
 			
-			// TODO check reverse flow as well
-			
-			DataPath shortestPath = 
-					topologyService.getDatabaseShortestPath(srcSwitchPort, dstSwitchPort);
-			
-			if (shortestPath == null || shortestPath.flowEntries().isEmpty()) {
-				log.warn("No path found between {} and {} - not handling packet",
-						srcSwitchPort, dstSwitchPort);
-				return;
-			}
-			
-			Port outPort = shortestPath.flowEntries().get(0).outPort();
-			forwardPacket(pi, sw, outPort.value());
+			// TODO do stuff.
+			OFPacketOut po = constructPacketOut(datapath, pi, sw);
+			waitingPackets.put(existingFlowId, new PacketToPush(po, sw.getId()));
 			return;
 		}
 		
-		// Calculate a shortest path before pushing flow mods.
-		// This will be used later by the packet-out processing, but it uses
-		// the database so will be slow, and we should do it before flow mods.
-		DataPath shortestPath = 
-				topologyService.getDatabaseShortestPath(srcSwitchPort, dstSwitchPort);
-		
-		if (shortestPath == null || shortestPath.flowEntries().isEmpty()) {
-			log.warn("No path found between {} and {} - not handling packet",
-					srcSwitchPort, dstSwitchPort);
-			return;
-		}
 		
 		log.debug("Adding new flow between {} at {} and {} at {}",
 				new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
 		
 		
-		DataPath dataPath = new DataPath();
-		dataPath.setSrcPort(srcSwitchPort);
-		dataPath.setDstPort(dstSwitchPort);
-		
 		CallerId callerId = new CallerId("Forwarding");
 		
 		//FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
@@ -226,10 +270,9 @@
 		// For now just forward IPv4 packets. This prevents accidentally
 		// forwarding other stuff like ARP.
 		flowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
-		flowPath.setDataPath(dataPath);
+		flowPath.setDataPath(datapath);
 			
-		FlowId flowId = flowService.addFlow(flowPath);
-		//flowService.addFlow(flowPath, flowId);
+		
 		
 		
 		DataPath reverseDataPath = new DataPath();
@@ -237,7 +280,6 @@
 		reverseDataPath.setSrcPort(dstSwitchPort);
 		reverseDataPath.setDstPort(srcSwitchPort);
 		
-		//FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
 		// TODO implement copy constructor for FlowPath
 		FlowPath reverseFlowPath = new FlowPath();
 		//reverseFlowPath.setFlowId(reverseFlowId);
@@ -253,13 +295,28 @@
 		reverseFlowPath.dataPath().srcPort().dpid().toString();
 		
 		// TODO what happens if no path exists?
-		//flowService.addFlow(reverseFlowPath, reverseFlowId);
-		FlowId reverseFlowId = flowService.addFlow(reverseFlowPath);
 		
-		Port outPort = shortestPath.flowEntries().get(0).outPort();
-		forwardPacket(pi, sw, outPort.value());
+		FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
+		FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
+		
+		flowPath.setFlowId(flowId);
+		reverseFlowPath.setFlowId(reverseFlowId);
+		
+		OFPacketOut po = constructPacketOut(datapath, pi, sw);
+		Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort);
+		
+		// Add to waiting lists
+		pendingFlows.put(pathspec, flowId.value());
+		pendingFlows.put(reversePathSpec, reverseFlowId.value());
+		waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
+		
+		
+		
+		flowService.addFlow(reverseFlowPath);
+		flowService.addFlow(flowPath);
 	}
 	
+	/*
 	private boolean flowExists(SwitchPort srcPort, MACAddress srcMac, 
 			SwitchPort dstPort, MACAddress dstMac) {
 		for (FlowPath flow : datagridService.getAllFlows()) {
@@ -285,17 +342,18 @@
 		
 		return false;
 	}
+	*/
 
-	private void forwardPacket(OFPacketIn pi, IOFSwitch sw, short port) {
-		List<OFAction> actions = new ArrayList<OFAction>(1);
-		actions.add(new OFActionOutput(port));
+	private OFPacketOut constructPacketOut(DataPath datapath, OFPacketIn pi, 
+			IOFSwitch sw) {
+		//List<OFAction> actions = new ArrayList<OFAction>(1);
+		//actions.add(new OFActionOutput(port));
 		
 		OFPacketOut po = new OFPacketOut();
 		po.setInPort(OFPort.OFPP_NONE)
 		.setInPort(pi.getInPort())
-		.setActions(actions)
-		.setActionsLength((short)OFActionOutput.MINIMUM_LENGTH)
-		.setLengthU(OFPacketOut.MINIMUM_LENGTH + OFActionOutput.MINIMUM_LENGTH);
+		.setActions(new ArrayList<OFAction>())
+		.setLengthU(OFPacketOut.MINIMUM_LENGTH);
 		
 		if (sw.getBuffers() == 0) {
 			po.setBufferId(OFPacketOut.BUFFER_ID_NONE)
@@ -306,11 +364,39 @@
 			po.setBufferId(pi.getBufferId());
 		}
 		
-		try {
-			sw.write(po, null);
-			sw.flush();
-		} catch (IOException e) {
-			log.error("Error writing packet out to switch: {}", e);
+		return po;
+	}
+	
+	@Override
+	public void flowInstalled(FlowPath installedFlowPath) {
+		// TODO check concurrency
+		// will need to sync and access both collections at once.
+		long flowId = installedFlowPath.flowId().value();
+		Collection<PacketToPush> packets = waitingPackets.removeAll(flowId);
+		
+		//remove pending flows entry
+		Path pathToRemove = new Path(installedFlowPath.dataPath().srcPort(),
+				installedFlowPath.dataPath().dstPort());
+		pendingFlows.remove(pathToRemove);
+		
+		for (PacketToPush packet : packets) {
+			IOFSwitch sw = floodlightProvider.getSwitches().get(packet.dpid);
+			
+			OFPacketOut po = packet.packet;
+			short outPort = 
+					installedFlowPath.flowEntries().get(0).outPort().value();
+			po.getActions().add(new OFActionOutput(outPort));
+			po.setActionsLength((short)
+					(po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
+			po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
+			
+			try {
+				sw.write(packet.packet, null);
+				sw.flush();
+			} catch (IOException e) {
+				log.error("Error writing packet out to switch {}:", 
+						sw.getId(), e);
+			}
 		}
 	}
 }
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
new file mode 100644
index 0000000..07f6733
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
@@ -0,0 +1,21 @@
+package net.onrc.onos.ofcontroller.forwarding;
+
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
+/**
+ * Temporary interface that allows the Forwarding module to be
+ * notified when a flow has been installed by the FlowManager.
+ * 
+ * This should be refactored to a listener framework in the future.
+ * @author jono
+ *
+ */
+public interface IForwardingService extends IFloodlightService {
+	/**
+	 * Notify the Forwarding module that a flow has been installed
+	 * in the network. 
+	 * @param flowPath The FlowPath object describing the installed flow
+	 */
+	public void flowInstalled(FlowPath flowPath);
+}