Changed the model of Forwarding to allow for flow installed notifications
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index f33f986..5a517e8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -6,6 +6,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -46,7 +47,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class Forwarding implements IOFMessageListener, IFloodlightModule {
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+
+public class Forwarding implements IOFMessageListener, IFloodlightModule,
+ IForwardingService {
private final static Logger log = LoggerFactory.getLogger(Forwarding.class);
private IFloodlightProviderService floodlightProvider;
@@ -56,8 +62,48 @@
private IDeviceStorage deviceStorage;
private TopologyManager topologyService;
- public Forwarding() {
+ private Map<Path, Long> pendingFlows;
+ private Multimap<Long, PacketToPush> waitingPackets;
+
+ public class PacketToPush {
+ public final OFPacketOut packet;
+ public final long dpid;
+ public PacketToPush(OFPacketOut packet, long dpid) {
+ this.packet = packet;
+ this.dpid = dpid;
+ }
+ }
+
+ public final class Path {
+ public final SwitchPort srcPort;
+ public final SwitchPort dstPort;
+
+ public Path(SwitchPort src, SwitchPort dst) {
+ srcPort = new SwitchPort(new Dpid(src.dpid().value()),
+ new Port(src.port().value()));
+ dstPort = new SwitchPort(new Dpid(dst.dpid().value()),
+ new Port(dst.port().value()));
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof Path)) {
+ return false;
+ }
+
+ Path otherPath = (Path) other;
+ return srcPort.equals(otherPath.srcPort) &&
+ dstPort.equals(otherPath.dstPort);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 17;
+ hash = 31 * hash + srcPort.hashCode();
+ hash = 31 * hash + dstPort.hashCode();
+ return hash;
+ }
}
@Override
@@ -89,6 +135,10 @@
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
+ pendingFlows = new ConcurrentHashMap<Path, Long>();
+ waitingPackets = Multimaps.synchronizedSetMultimap(
+ HashMultimap.<Long, PacketToPush>create());
+
deviceStorage = new DeviceStorageImpl();
deviceStorage.init("");
topologyService = new TopologyManager();
@@ -141,7 +191,8 @@
}
private void handlePacketIn(IOFSwitch sw, OFPacketIn pi, Ethernet eth) {
- String destinationMac = HexString.toHexString(eth.getDestinationMACAddress());
+ String destinationMac =
+ HexString.toHexString(eth.getDestinationMACAddress());
IDeviceObject deviceObject = deviceStorage.getDeviceByMac(
destinationMac);
@@ -151,7 +202,7 @@
return;
}
- Iterator<IPortObject> ports = deviceObject.getAttachedPorts().iterator();
+ Iterator<IPortObject> ports = deviceObject.getAttachedPorts().iterator();
if (!ports.hasNext()) {
log.debug("No attachment point found for device {}", destinationMac);
return;
@@ -171,46 +222,32 @@
MACAddress srcMacAddress = MACAddress.valueOf(eth.getSourceMACAddress());
MACAddress dstMacAddress = MACAddress.valueOf(eth.getDestinationMACAddress());
- if (flowExists(srcSwitchPort, srcMacAddress,
- dstSwitchPort, dstMacAddress)) {
- log.debug("Not adding flow because it already exists");
+
+ DataPath datapath = new DataPath();
+ datapath.setSrcPort(srcSwitchPort);
+ datapath.setDstPort(dstSwitchPort);
+
+
+
+ Path pathspec = new Path(srcSwitchPort, dstSwitchPort);
+ // TODO check concurrency
+ Long existingFlowId = pendingFlows.get(pathspec);
+
+ if (existingFlowId != null) {
+ log.debug("Found existing flow {}",
+ HexString.toHexString(existingFlowId));
- // TODO check reverse flow as well
-
- DataPath shortestPath =
- topologyService.getDatabaseShortestPath(srcSwitchPort, dstSwitchPort);
-
- if (shortestPath == null || shortestPath.flowEntries().isEmpty()) {
- log.warn("No path found between {} and {} - not handling packet",
- srcSwitchPort, dstSwitchPort);
- return;
- }
-
- Port outPort = shortestPath.flowEntries().get(0).outPort();
- forwardPacket(pi, sw, outPort.value());
+ // TODO do stuff.
+ OFPacketOut po = constructPacketOut(datapath, pi, sw);
+ waitingPackets.put(existingFlowId, new PacketToPush(po, sw.getId()));
return;
}
- // Calculate a shortest path before pushing flow mods.
- // This will be used later by the packet-out processing, but it uses
- // the database so will be slow, and we should do it before flow mods.
- DataPath shortestPath =
- topologyService.getDatabaseShortestPath(srcSwitchPort, dstSwitchPort);
-
- if (shortestPath == null || shortestPath.flowEntries().isEmpty()) {
- log.warn("No path found between {} and {} - not handling packet",
- srcSwitchPort, dstSwitchPort);
- return;
- }
log.debug("Adding new flow between {} at {} and {} at {}",
new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
- DataPath dataPath = new DataPath();
- dataPath.setSrcPort(srcSwitchPort);
- dataPath.setDstPort(dstSwitchPort);
-
CallerId callerId = new CallerId("Forwarding");
//FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
@@ -226,10 +263,9 @@
// For now just forward IPv4 packets. This prevents accidentally
// forwarding other stuff like ARP.
flowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
- flowPath.setDataPath(dataPath);
+ flowPath.setDataPath(datapath);
- FlowId flowId = flowService.addFlow(flowPath);
- //flowService.addFlow(flowPath, flowId);
+
DataPath reverseDataPath = new DataPath();
@@ -237,7 +273,6 @@
reverseDataPath.setSrcPort(dstSwitchPort);
reverseDataPath.setDstPort(srcSwitchPort);
- //FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
// TODO implement copy constructor for FlowPath
FlowPath reverseFlowPath = new FlowPath();
//reverseFlowPath.setFlowId(reverseFlowId);
@@ -253,13 +288,28 @@
reverseFlowPath.dataPath().srcPort().dpid().toString();
// TODO what happens if no path exists?
- //flowService.addFlow(reverseFlowPath, reverseFlowId);
- FlowId reverseFlowId = flowService.addFlow(reverseFlowPath);
- Port outPort = shortestPath.flowEntries().get(0).outPort();
- forwardPacket(pi, sw, outPort.value());
+ FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
+ FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
+
+ flowPath.setFlowId(flowId);
+ reverseFlowPath.setFlowId(reverseFlowId);
+
+ OFPacketOut po = constructPacketOut(datapath, pi, sw);
+ Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort);
+
+ // Add to waiting lists
+ pendingFlows.put(pathspec, flowId.value());
+ pendingFlows.put(reversePathSpec, reverseFlowId.value());
+ waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
+
+
+
+ flowService.addFlow(reverseFlowPath);
+ flowService.addFlow(flowPath);
}
+ /*
private boolean flowExists(SwitchPort srcPort, MACAddress srcMac,
SwitchPort dstPort, MACAddress dstMac) {
for (FlowPath flow : datagridService.getAllFlows()) {
@@ -285,17 +335,18 @@
return false;
}
+ */
- private void forwardPacket(OFPacketIn pi, IOFSwitch sw, short port) {
- List<OFAction> actions = new ArrayList<OFAction>(1);
- actions.add(new OFActionOutput(port));
+ private OFPacketOut constructPacketOut(DataPath datapath, OFPacketIn pi,
+ IOFSwitch sw) {
+ //List<OFAction> actions = new ArrayList<OFAction>(1);
+ //actions.add(new OFActionOutput(port));
OFPacketOut po = new OFPacketOut();
po.setInPort(OFPort.OFPP_NONE)
.setInPort(pi.getInPort())
- .setActions(actions)
- .setActionsLength((short)OFActionOutput.MINIMUM_LENGTH)
- .setLengthU(OFPacketOut.MINIMUM_LENGTH + OFActionOutput.MINIMUM_LENGTH);
+ .setActions(new ArrayList<OFAction>())
+ .setLengthU(OFPacketOut.MINIMUM_LENGTH);
if (sw.getBuffers() == 0) {
po.setBufferId(OFPacketOut.BUFFER_ID_NONE)
@@ -306,11 +357,39 @@
po.setBufferId(pi.getBufferId());
}
- try {
- sw.write(po, null);
- sw.flush();
- } catch (IOException e) {
- log.error("Error writing packet out to switch: {}", e);
+ return po;
+ }
+
+ @Override
+ public void flowInstalled(FlowPath installedFlowPath) {
+ // TODO check concurrency
+ // will need to sync and access both collections at once.
+ long flowId = installedFlowPath.flowId().value();
+ Collection<PacketToPush> packets = waitingPackets.removeAll(flowId);
+
+ //remove pending flows entry
+ Path pathToRemove = new Path(installedFlowPath.dataPath().srcPort(),
+ installedFlowPath.dataPath().dstPort());
+ pendingFlows.remove(pathToRemove);
+
+ for (PacketToPush packet : packets) {
+ IOFSwitch sw = floodlightProvider.getSwitches().get(packet.dpid);
+
+ OFPacketOut po = packet.packet;
+ short outPort =
+ installedFlowPath.flowEntries().get(0).outPort().value();
+ po.getActions().add(new OFActionOutput(outPort));
+ po.setActionsLength((short)
+ (po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
+ po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
+
+ try {
+ sw.write(packet.packet, null);
+ sw.flush();
+ } catch (IOException e) {
+ log.error("Error writing packet out to switch {}:",
+ sw.getId(), e);
+ }
}
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
new file mode 100644
index 0000000..07f6733
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/IForwardingService.java
@@ -0,0 +1,21 @@
+package net.onrc.onos.ofcontroller.forwarding;
+
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
+/**
+ * Temporary interface that allows the Forwarding module to be
+ * notified when a flow has been installed by the FlowManager.
+ *
+ * This should be refactored to a listener framework in the future.
+ * @author jono
+ *
+ */
+public interface IForwardingService extends IFloodlightService {
+ /**
+ * Notify the Forwarding module that a flow has been installed
+ * in the network.
+ * @param flowPath The FlowPath object describing the installed flow
+ */
+ public void flowInstalled(FlowPath flowPath);
+}