Added course synchronization, FlowPusher, and temporary flows
diff --git a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
index 6918421..2370936 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/forwarding/Forwarding.java
@@ -1,6 +1,5 @@
package net.onrc.onos.ofcontroller.forwarding;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -18,13 +17,13 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.util.MACAddress;
-import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.ofcontroller.core.IDeviceStorage;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IDeviceObject;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
import net.onrc.onos.ofcontroller.core.internal.DeviceStorageImpl;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
import net.onrc.onos.ofcontroller.topology.TopologyManager;
import net.onrc.onos.ofcontroller.util.CallerId;
import net.onrc.onos.ofcontroller.util.DataPath;
@@ -50,15 +49,17 @@
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Multimaps;
public class Forwarding implements IOFMessageListener, IFloodlightModule,
IForwardingService {
private final static Logger log = LoggerFactory.getLogger(Forwarding.class);
+ private final int IDLE_TIMEOUT = 5; // seconds
+ private final int HARD_TIMEOUT = 0; // seconds
+
private IFloodlightProviderService floodlightProvider;
private IFlowService flowService;
- private IDatagridService datagridService;
+ private IFlowPusherService flowPusher;
private IDeviceStorage deviceStorage;
private TopologyManager topologyService;
@@ -66,6 +67,8 @@
private Map<Path, Long> pendingFlows;
private Multimap<Long, PacketToPush> waitingPackets;
+ private final Object lock = new Object();
+
public class PacketToPush {
public final OFPacketOut packet;
public final long dpid;
@@ -129,22 +132,23 @@
new ArrayList<Class<? extends IFloodlightService>>();
dependencies.add(IFloodlightProviderService.class);
dependencies.add(IFlowService.class);
- dependencies.add(IDatagridService.class);
+ dependencies.add(IFlowPusherService.class);
return dependencies;
}
@Override
public void init(FloodlightModuleContext context) {
- this.floodlightProvider =
+ floodlightProvider =
context.getServiceImpl(IFloodlightProviderService.class);
- this.flowService = context.getServiceImpl(IFlowService.class);
- this.datagridService = context.getServiceImpl(IDatagridService.class);
+ flowService = context.getServiceImpl(IFlowService.class);
+ flowPusher = context.getServiceImpl(IFlowPusherService.class);
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
pendingFlows = new ConcurrentHashMap<Path, Long>();
- waitingPackets = Multimaps.synchronizedSetMultimap(
- HashMultimap.<Long, PacketToPush>create());
+ //waitingPackets = Multimaps.synchronizedSetMultimap(
+ //HashMultimap.<Long, PacketToPush>create());
+ waitingPackets = HashMultimap.create();
deviceStorage = new DeviceStorageImpl();
deviceStorage.init("");
@@ -230,90 +234,88 @@
MACAddress dstMacAddress = MACAddress.valueOf(eth.getDestinationMACAddress());
- DataPath datapath = new DataPath();
- datapath.setSrcPort(srcSwitchPort);
- datapath.setDstPort(dstSwitchPort);
-
-
+ FlowPath flowPath, reverseFlowPath;
Path pathspec = new Path(srcSwitchPort, dstSwitchPort);
// TODO check concurrency
- Long existingFlowId = pendingFlows.get(pathspec);
-
- if (existingFlowId != null) {
- log.debug("Found existing flow {}",
- HexString.toHexString(existingFlowId));
+ synchronized (lock) {
+ Long existingFlowId = pendingFlows.get(pathspec);
- // TODO do stuff.
- OFPacketOut po = constructPacketOut(datapath, pi, sw);
- waitingPackets.put(existingFlowId, new PacketToPush(po, sw.getId()));
- return;
+ if (existingFlowId != null) {
+ log.debug("Found existing flow {}",
+ HexString.toHexString(existingFlowId));
+
+ OFPacketOut po = constructPacketOut(pi, sw);
+ waitingPackets.put(existingFlowId, new PacketToPush(po, sw.getId()));
+ return;
+ }
+
+
+ log.debug("Adding new flow between {} at {} and {} at {}",
+ new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
+
+
+ CallerId callerId = new CallerId("Forwarding");
+
+ DataPath datapath = new DataPath();
+ datapath.setSrcPort(srcSwitchPort);
+ datapath.setDstPort(dstSwitchPort);
+
+ flowPath = new FlowPath();
+ flowPath.setInstallerId(callerId);
+
+ flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
+ flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
+ flowPath.setFlowEntryMatch(new FlowEntryMatch());
+ flowPath.setIdleTimeout(IDLE_TIMEOUT);
+ flowPath.setHardTimeout(HARD_TIMEOUT);
+ flowPath.flowEntryMatch().enableSrcMac(srcMacAddress);
+ flowPath.flowEntryMatch().enableDstMac(dstMacAddress);
+ flowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
+ flowPath.setDataPath(datapath);
+
+
+ DataPath reverseDataPath = new DataPath();
+ // Reverse the ports for the reverse path
+ reverseDataPath.setSrcPort(dstSwitchPort);
+ reverseDataPath.setDstPort(srcSwitchPort);
+
+ // TODO implement copy constructor for FlowPath
+ reverseFlowPath = new FlowPath();
+ reverseFlowPath.setInstallerId(callerId);
+ reverseFlowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
+ reverseFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
+ reverseFlowPath.setIdleTimeout(IDLE_TIMEOUT);
+ reverseFlowPath.setHardTimeout(HARD_TIMEOUT);
+ reverseFlowPath.setFlowEntryMatch(new FlowEntryMatch());
+ // Reverse the MAC addresses for the reverse path
+ reverseFlowPath.flowEntryMatch().enableSrcMac(dstMacAddress);
+ reverseFlowPath.flowEntryMatch().enableDstMac(srcMacAddress);
+ reverseFlowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
+ reverseFlowPath.setDataPath(reverseDataPath);
+ reverseFlowPath.dataPath().srcPort().dpid().toString();
+
+ // TODO what happens if no path exists? cleanup
+
+ FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
+ FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
+
+ flowPath.setFlowId(flowId);
+ reverseFlowPath.setFlowId(reverseFlowId);
+
+ OFPacketOut po = constructPacketOut(pi, sw);
+ Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort);
+
+ // Add to waiting lists
+ pendingFlows.put(pathspec, flowId.value());
+ pendingFlows.put(reversePathSpec, reverseFlowId.value());
+ waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
+
}
-
- log.debug("Adding new flow between {} at {} and {} at {}",
- new Object[]{srcMacAddress, srcSwitchPort, dstMacAddress, dstSwitchPort});
-
-
- CallerId callerId = new CallerId("Forwarding");
-
- //FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
- FlowPath flowPath = new FlowPath();
- //flowPath.setFlowId(flowId);
- flowPath.setInstallerId(callerId);
-
- flowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
- flowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
- flowPath.setFlowEntryMatch(new FlowEntryMatch());
- flowPath.flowEntryMatch().enableSrcMac(srcMacAddress);
- flowPath.flowEntryMatch().enableDstMac(dstMacAddress);
- // For now just forward IPv4 packets. This prevents accidentally
- // forwarding other stuff like ARP.
- flowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
- flowPath.setDataPath(datapath);
-
-
-
-
- DataPath reverseDataPath = new DataPath();
- // Reverse the ports for the reverse path
- reverseDataPath.setSrcPort(dstSwitchPort);
- reverseDataPath.setDstPort(srcSwitchPort);
-
- // TODO implement copy constructor for FlowPath
- FlowPath reverseFlowPath = new FlowPath();
- //reverseFlowPath.setFlowId(reverseFlowId);
- reverseFlowPath.setInstallerId(callerId);
- reverseFlowPath.setFlowPathType(FlowPathType.FP_TYPE_SHORTEST_PATH);
- reverseFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_ADD);
- reverseFlowPath.setFlowEntryMatch(new FlowEntryMatch());
- // Reverse the MAC addresses for the reverse path
- reverseFlowPath.flowEntryMatch().enableSrcMac(dstMacAddress);
- reverseFlowPath.flowEntryMatch().enableDstMac(srcMacAddress);
- reverseFlowPath.flowEntryMatch().enableEthernetFrameType(Ethernet.TYPE_IPv4);
- reverseFlowPath.setDataPath(reverseDataPath);
- reverseFlowPath.dataPath().srcPort().dpid().toString();
-
- // TODO what happens if no path exists?
-
- FlowId flowId = new FlowId(flowService.getNextFlowEntryId());
- FlowId reverseFlowId = new FlowId(flowService.getNextFlowEntryId());
-
- flowPath.setFlowId(flowId);
- reverseFlowPath.setFlowId(reverseFlowId);
-
- OFPacketOut po = constructPacketOut(datapath, pi, sw);
- Path reversePathSpec = new Path(dstSwitchPort, srcSwitchPort);
-
- // Add to waiting lists
- pendingFlows.put(pathspec, flowId.value());
- pendingFlows.put(reversePathSpec, reverseFlowId.value());
- waitingPackets.put(flowId.value(), new PacketToPush(po, sw.getId()));
-
-
-
flowService.addFlow(reverseFlowPath);
flowService.addFlow(flowPath);
+
}
/*
@@ -344,11 +346,7 @@
}
*/
- private OFPacketOut constructPacketOut(DataPath datapath, OFPacketIn pi,
- IOFSwitch sw) {
- //List<OFAction> actions = new ArrayList<OFAction>(1);
- //actions.add(new OFActionOutput(port));
-
+ private OFPacketOut constructPacketOut(OFPacketIn pi, IOFSwitch sw) {
OFPacketOut po = new OFPacketOut();
po.setInPort(OFPort.OFPP_NONE)
.setInPort(pi.getInPort())
@@ -372,12 +370,17 @@
// TODO check concurrency
// will need to sync and access both collections at once.
long flowId = installedFlowPath.flowId().value();
- Collection<PacketToPush> packets = waitingPackets.removeAll(flowId);
- //remove pending flows entry
- Path pathToRemove = new Path(installedFlowPath.dataPath().srcPort(),
- installedFlowPath.dataPath().dstPort());
- pendingFlows.remove(pathToRemove);
+ Collection<PacketToPush> packets;
+ synchronized (lock) {
+ packets = waitingPackets.removeAll(flowId);
+
+ //remove pending flows entry
+ Path pathToRemove = new Path(installedFlowPath.dataPath().srcPort(),
+ installedFlowPath.dataPath().dstPort());
+ pendingFlows.remove(pathToRemove);
+
+ }
for (PacketToPush packet : packets) {
IOFSwitch sw = floodlightProvider.getSwitches().get(packet.dpid);
@@ -390,13 +393,7 @@
(po.getActionsLength() + OFActionOutput.MINIMUM_LENGTH));
po.setLengthU(po.getLengthU() + OFActionOutput.MINIMUM_LENGTH);
- try {
- sw.write(packet.packet, null);
- sw.flush();
- } catch (IOException e) {
- log.error("Error writing packet out to switch {}:",
- sw.getId(), e);
- }
+ flowPusher.add(sw, po);
}
}
}