Merge pull request #505 from jonohart/fw
Forwarding bug fixes
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 02972e2..133f29c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -7,7 +7,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -62,6 +61,8 @@
private final int IDLE_TIMEOUT = 5; // seconds
private final int HARD_TIMEOUT = 0; // seconds
+
+ private final int PATH_PUSHED_TIMEOUT = 3000; // milliseconds
private IFloodlightProviderService floodlightProvider;
private IFlowService flowService;
@@ -71,12 +72,15 @@
private IDeviceStorage deviceStorage;
private TopologyManager topologyService;
- private Map<Path, Long> pendingFlows;
+ //private Map<Path, Long> pendingFlows;
+ // TODO it seems there is a Guava collection that will time out entries.
+ // We should see if this will work here.
+ private Map<Path, PushedFlow> pendingFlows;
private ListMultimap<Long, PacketToPush> waitingPackets;
private final Object lock = new Object();
- public class PacketToPush {
+ private class PacketToPush {
public final OFPacketOut packet;
public final long dpid;
@@ -86,15 +90,35 @@
}
}
- public final class Path {
+ private class PushedFlow {
+ public final long flowId;
+ private final long pushedTime;
+ public short firstHopOutPort = OFPort.OFPP_NONE.getValue();
+
+ public PushedFlow(long flowId) {
+ this.flowId = flowId;
+ pushedTime = System.currentTimeMillis();
+ }
+
+ public boolean isExpired() {
+ return (System.currentTimeMillis() - pushedTime) > PATH_PUSHED_TIMEOUT;
+ }
+ }
+
+ private final class Path {
public final SwitchPort srcPort;
public final SwitchPort dstPort;
+ public final MACAddress srcMac;
+ public final MACAddress dstMac;
- public Path(SwitchPort src, SwitchPort dst) {
+ public Path(SwitchPort src, SwitchPort dst,
+ MACAddress srcMac, MACAddress dstMac) {
srcPort = new SwitchPort(new Dpid(src.dpid().value()),
new Port(src.port().value()));
dstPort = new SwitchPort(new Dpid(dst.dpid().value()),
new Port(dst.port().value()));
+ this.srcMac = srcMac;
+ this.dstMac = dstMac;
}
@Override
@@ -105,7 +129,9 @@
Path otherPath = (Path) other;
return srcPort.equals(otherPath.srcPort) &&
- dstPort.equals(otherPath.dstPort);
+ dstPort.equals(otherPath.dstPort) &&
+ srcMac.equals(otherPath.srcMac) &&
+ dstMac.equals(otherPath.dstMac);
}
@Override
@@ -113,8 +139,16 @@
int hash = 17;
hash = 31 * hash + srcPort.hashCode();
hash = 31 * hash + dstPort.hashCode();
+ hash = 31 * hash + srcMac.hashCode();
+ hash = 31 * hash + dstMac.hashCode();
return hash;
}
+
+ @Override
+ public String toString() {
+ return "(" + srcMac + " at " + srcPort + ") => ("
+ + dstPort + " at " + dstMac + ")";
+ }
}
@Override
@@ -154,7 +188,8 @@
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
- pendingFlows = new ConcurrentHashMap<Path, Long>();
+ //pendingFlows = new ConcurrentHashMap<Path, Long>();
+ pendingFlows = new HashMap<Path, PushedFlow>();
//waitingPackets = Multimaps.synchronizedSetMultimap(
//HashMultimap.<Long, PacketToPush>create());
//waitingPackets = HashMultimap.create();
@@ -280,20 +315,35 @@
FlowPath flowPath, reverseFlowPath;
- Path pathspec = new Path(srcSwitchPort, dstSwitchPort);
+ Path pathspec = new Path(srcSwitchPort, dstSwitchPort,
+ srcMacAddress, dstMacAddress);
// TODO check concurrency
synchronized (lock) {
- Long existingFlowId = pendingFlows.get(pathspec);
+ PushedFlow existingFlow = pendingFlows.get(pathspec);
+ //Long existingFlowId = pendingFlows.get(pathspec);
- if (existingFlowId != null) {
+ if (existingFlow != null && !existingFlow.isExpired()) {
log.debug("Found existing flow {}",
- HexString.toHexString(existingFlowId));
+ HexString.toHexString(existingFlow.flowId));
OFPacketOut po = constructPacketOut(pi, sw);
- waitingPackets.put(existingFlowId, new PacketToPush(po, sw.getId()));
+
+ if (existingFlow.firstHopOutPort != OFPort.OFPP_NONE.getValue()) {
+ // Flow has been sent to the switches so it is safe to
+ // send a packet out now
+ sendPacketOut(sw, po, existingFlow.firstHopOutPort);
+ }
+ else {
+ // Flow has not yet been sent to switches so save the
+ // packet out for later
+ waitingPackets.put(existingFlow.flowId,
+ new PacketToPush(po, sw.getId()));
+ }
return;
}
+ //log.debug("Couldn't match {} in {}", pathspec, pendingFlows);
+
log.debug("Adding new flow between {} at {} and {} at {}",
new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
@@ -347,11 +397,14 @@
reverseFlowPath.setFlowId(reverseFlowId);
OFPacketOut po = constructPacketOut(pi, sw);
- Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort);
+ Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort,
+ dstMacAddress, srcMacAddress);
// Add to waiting lists
- pendingFlows.put(pathspec, flowId.value());
- pendingFlows.put(reversePathSpec, reverseFlowId.value());
+ //pendingFlows.put(pathspec, flowId.value());
+ //pendingFlows.put(reversePathSpec, reverseFlowId.value());
+ pendingFlows.put(pathspec, new PushedFlow(flowId.value()));
+ pendingFlows.put(reversePathSpec, new PushedFlow(reverseFlowId.value()));
waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
}
@@ -416,33 +469,42 @@
}
private void flowInstalled(FlowPath installedFlowPath) {
- // TODO check concurrency
- // will need to sync and access both collections at once.
long flowId = installedFlowPath.flowId().value();
+ short outPort =
+ installedFlowPath.flowEntries().get(0).outPort().value();
+
+ MACAddress srcMacAddress = installedFlowPath.flowEntryMatch().srcMac();
+ MACAddress dstMacAddress = installedFlowPath.flowEntryMatch().dstMac();
+
Collection<PacketToPush> packets;
synchronized (lock) {
+ log.debug("Flow {} has been installed, sending queued packets",
+ installedFlowPath.flowId());
+
packets = waitingPackets.removeAll(flowId);
- //remove pending flows entry
- Path pathToRemove = new Path(installedFlowPath.dataPath().srcPort(),
- installedFlowPath.dataPath().dstPort());
- pendingFlows.remove(pathToRemove);
-
+ // remove pending flows entry
+ Path installedPath = new Path(installedFlowPath.dataPath().srcPort(),
+ installedFlowPath.dataPath().dstPort(),
+ srcMacAddress, dstMacAddress);
+ //pendingFlows.remove(pathToRemove);
+ pendingFlows.get(installedPath).firstHopOutPort = outPort;
}
for (PacketToPush packet : packets) {
IOFSwitch sw = floodlightProvider.getSwitches().get(packet.dpid);
- OFPacketOut po = packet.packet;
- short outPort =
- installedFlowPath.flowEntries().get(0).outPort().value();
- po.getActions().add(new OFActionOutput(outPort));
- po.setActionsLength((short)
- (po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
- po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
-
- flowPusher.add(sw, po);
+ sendPacketOut(sw, packet.packet, outPort);
}
}
+
+ private void sendPacketOut(IOFSwitch sw, OFPacketOut po, short outPort) {
+ po.getActions().add(new OFActionOutput(outPort));
+ po.setActionsLength((short)
+ (po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
+ po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
+
+ flowPusher.add(sw, po);
+ }
}