Implemented a flow cache so we can still operate while if our switches lose connection by flushing and re-inserting flows when they reconnect
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
index a698fd5..826fd93 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
@@ -57,7 +57,6 @@
import org.codehaus.jackson.map.ObjectMapper;
import org.openflow.protocol.OFFlowMod;
import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFPacketOut;
import org.openflow.protocol.OFPort;
import org.openflow.protocol.OFType;
@@ -141,6 +140,8 @@
protected Map<Prefix, Path> prefixToPath;
protected Multimap<Prefix, PushedFlowMod> pushedFlows;
+ private FlowCache flowCache;
+
protected volatile Map<Long, ?> topoRouteTopology = null;
protected class TopologyChangeDetector implements Runnable {
@@ -281,6 +282,8 @@
prefixToPath = new HashMap<Prefix, Path>();
pushedFlows = HashMultimap.<Prefix, PushedFlowMod>create();
+ flowCache = new FlowCache(floodlightProvider);
+
bgpUpdatesExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("bgp-updates-%d").build());
@@ -549,31 +552,16 @@
actions.add(outputAction);
fm.setActions(actions);
- //Write to switch
- IOFSwitch sw = floodlightProvider.getSwitches()
- .get(srcInterface.getDpid());
-
- if (sw == null){
- log.warn("Switch not found when pushing flow mod");
- continue;
- }
-
- pushedFlows.put(prefix, new PushedFlowMod(sw.getId(), fm));
-
- List<OFMessage> msglist = new ArrayList<OFMessage>();
- msglist.add(fm);
- try {
- sw.write(msglist, null);
- sw.flush();
-
- /*
- * XXX Rate limit hack!
- * This should be solved properly by adding a rate limiting
- * layer on top of the switches if we know they need it.
- */
+ pushedFlows.put(prefix, new PushedFlowMod(srcInterface.getDpid(), fm));
+ flowCache.write(srcInterface.getDpid(), fm);
+
+ /*
+ * XXX Rate limit hack!
+ * This should be solved properly by adding a rate limiting
+ * layer on top of the switches if we know they need it.
+ */
+ try {
Thread.sleep(1);
- } catch (IOException e) {
- log.error("Failure writing flow mod", e);
} catch (InterruptedException e) {
// TODO handle this properly
log.error("Interrupted", e);
@@ -653,24 +641,7 @@
}
private void sendDeleteFlowMod(OFFlowMod addFlowMod, long dpid) {
- addFlowMod.setCommand(OFFlowMod.OFPFC_DELETE_STRICT)
- .setOutPort(OFPort.OFPP_NONE)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH);
-
- addFlowMod.getActions().clear();
-
- IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
- if (sw == null) {
- log.warn("Switch not found when pushing delete flow mod");
- return;
- }
-
- try {
- sw.write(addFlowMod, null);
- sw.flush();
- } catch (IOException e) {
- log.error("Failure writing flow mod", e);
- }
+ flowCache.delete(dpid, addFlowMod);
}
//TODO test next-hop changes
@@ -741,7 +712,8 @@
return;
}
- pushedFlows.addAll(installPath(shortestPath.flowEntries(), dstMacAddress));
+ List<PushedFlowMod> pushedFlowMods = installPath(shortestPath.flowEntries(), dstMacAddress);
+ pushedFlows.addAll(pushedFlowMods);
}
path.setFlowMods(pushedFlows);
@@ -781,24 +753,10 @@
fm.setMatch(match);
- IOFSwitch sw = floodlightProvider.getSwitches().get(flowEntry.dpid().value());
+ flowMods.add(new PushedFlowMod(flowEntry.dpid().value(), fm));
- if (sw == null){
- log.warn("Switch not found when pushing flow mod");
- continue;
- }
-
- flowMods.add(new PushedFlowMod(sw.getId(), fm));
-
- List<OFMessage> msglist = new ArrayList<OFMessage>();
- msglist.add(fm);
- try {
- sw.write(msglist, null);
- sw.flush();
- } catch (IOException e) {
- log.error("Failure writing flow mod", e);
- }
-
+ flowCache.write(flowEntry.dpid().value(), fm);
+
try {
fm = fm.clone();
} catch (CloneNotSupportedException e1) {
@@ -852,7 +810,7 @@
//Common match fields
forwardMatchSrc.setDataLayerType(Ethernet.TYPE_IPv4);
forwardMatchSrc.setNetworkProtocol(IPv4.PROTOCOL_TCP);
- forwardMatchSrc.setTransportDestination(BGP_PORT);
+ //forwardMatchSrc.setTransportDestination(BGP_PORT);
forwardMatchSrc.setWildcards(forwardMatchSrc.getWildcards() & ~OFMatch.OFPFW_IN_PORT
& ~OFMatch.OFPFW_DL_TYPE & ~OFMatch.OFPFW_NW_PROTO);
@@ -934,29 +892,15 @@
((OFActionOutput)reverseIcmp.getActions().get(0))
.setPort(flowEntry.inPort().value());
reverseIcmp.setMatch(reverseIcmpMatch);
-
-
- IOFSwitch sw = floodlightProvider.getSwitches().get(flowEntry.dpid().value());
- if (sw == null) {
- log.warn("Switch not found when pushing BGP paths");
- return;
- }
-
- List<OFMessage> msgList = new ArrayList<OFMessage>(2);
- msgList.add(forwardFlowModSrc);
- msgList.add(forwardFlowModDst);
- msgList.add(reverseFlowModSrc);
- msgList.add(reverseFlowModDst);
- msgList.add(forwardIcmp);
- msgList.add(reverseIcmp);
-
- try {
- sw.write(msgList, null);
- sw.flush();
- } catch (IOException e) {
- log.error("Failure writing flow mod", e);
- }
+ List<OFFlowMod> flowModList = new ArrayList<OFFlowMod>(6);
+ flowModList.add(forwardFlowModSrc);
+ flowModList.add(forwardFlowModDst);
+ flowModList.add(reverseFlowModSrc);
+ flowModList.add(reverseFlowModDst);
+ flowModList.add(forwardIcmp);
+ flowModList.add(reverseIcmp);
+ flowCache.write(flowEntry.dpid().value(), flowModList);
}
}
}
@@ -1039,17 +983,7 @@
.setLengthU(OFFlowMod.MINIMUM_LENGTH + OFActionOutput.MINIMUM_LENGTH);
for (String strdpid : switches){
- IOFSwitch sw = floodlightProvider.getSwitches().get(HexString.toLong(strdpid));
- if (sw == null) {
- log.debug("Couldn't find switch to push ARP flow");
- }
- else {
- try {
- sw.write(fm, null);
- } catch (IOException e) {
- log.warn("Failure writing ARP flow to switch", e);
- }
- }
+ flowCache.write(HexString.toLong(strdpid), fm);
}
}
@@ -1100,22 +1034,13 @@
fmBDDP.setPriority(ARP_PRIORITY);
fmBDDP.setLengthU(OFFlowMod.MINIMUM_LENGTH + OFActionOutput.MINIMUM_LENGTH);
+ List<OFFlowMod> flowModList = new ArrayList<OFFlowMod>(3);
+ flowModList.add(fm);
+ flowModList.add(fmLLDP);
+ flowModList.add(fmBDDP);
+
for (String strdpid : switches){
- IOFSwitch sw = floodlightProvider.getSwitches().get(HexString.toLong(strdpid));
- if (sw == null) {
- log.debug("Couldn't find switch to push default deny flow");
- }
- else {
- List<OFMessage> msgList = new ArrayList<OFMessage>();
- msgList.add(fm);
- msgList.add(fmLLDP);
- msgList.add(fmBDDP);
- try {
- sw.write(msgList, null);
- } catch (IOException e) {
- log.warn("Failure writing default deny flow to switch", e);
- }
- }
+ flowCache.write(HexString.toLong(strdpid), flowModList);
}
}
@@ -1248,6 +1173,8 @@
if (!topologyReady) {
sw.clearAllFlowMods();
}
+
+ flowCache.switchConnected(sw);
}
@Override
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/FlowCache.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/FlowCache.java
new file mode 100644
index 0000000..d1cb578
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/FlowCache.java
@@ -0,0 +1,158 @@
+package net.onrc.onos.ofcontroller.bgproute;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFSwitch;
+
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFMessage;
+import org.openflow.protocol.OFPort;
+import org.openflow.util.HexString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlowCache {
+ private static Logger log = LoggerFactory.getLogger(FlowCache.class);
+
+ private IFloodlightProviderService floodlightProvider;
+
+ private Map<Long, List<OFFlowMod>> flowCache;
+
+ private Comparator<OFFlowMod> cookieComparator = new Comparator<OFFlowMod>() {
+ @Override
+ public int compare(OFFlowMod fm1, OFFlowMod fm2) {
+ long difference = fm2.getCookie() - fm1.getCookie();
+
+ if (difference > 0) {
+ return 1;
+ }
+ else if (difference < 0) {
+ return -1;
+ }
+ else {
+ return 0;
+ }
+ }
+ };
+
+ public FlowCache(IFloodlightProviderService floodlightProvider) {
+ this.floodlightProvider = floodlightProvider;
+
+ flowCache = new HashMap<Long, List<OFFlowMod>>();
+ }
+
+ public synchronized void write(long dpid, OFFlowMod flowMod) {
+ List<OFFlowMod> flowModList = new ArrayList<OFFlowMod>(1);
+ flowModList.add(flowMod);
+ write(dpid, flowModList);
+ }
+
+ public synchronized void write(long dpid, List<OFFlowMod> flowMods) {
+ ensureCacheForSwitch(dpid);
+
+ List<OFFlowMod> clones = new ArrayList<OFFlowMod>(flowMods.size());
+
+ //Somehow the OFFlowMods we get passed in will change later on.
+ //No idea how this happens, but we can just clone to prevent problems
+ try {
+ for (OFFlowMod fm : flowMods) {
+ clones.add(fm.clone());
+ }
+ } catch (CloneNotSupportedException e) {
+ log.debug("Clone exception", e);
+ }
+
+ flowCache.get(dpid).addAll(clones);
+
+ IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
+
+ if (sw == null) {
+ log.debug("Switch not found when writing flow mods");
+ return;
+ }
+
+ List<OFMessage> msgList = new ArrayList<OFMessage>(clones.size());
+ msgList.addAll(clones);
+
+ try {
+ sw.write(msgList, null);
+ } catch (IOException e) {
+ log.error("Error writing to switch", e);
+ }
+
+
+ }
+
+ public synchronized void delete(long dpid, OFFlowMod flowMod) {
+ List<OFFlowMod> flowModList = new ArrayList<OFFlowMod>(1);
+ flowModList.add(flowMod);
+ delete(dpid, flowModList);
+ }
+
+ public synchronized void delete(long dpid, List<OFFlowMod> flowMods) {
+ ensureCacheForSwitch(dpid);
+
+ //Remove the flow mods from the cache first before we alter them
+ flowCache.get(dpid).removeAll(flowMods);
+
+ //Alter the original flow mods to make them delete flow mods
+ for (OFFlowMod fm : flowMods) {
+ fm.setCommand(OFFlowMod.OFPFC_DELETE_STRICT)
+ .setOutPort(OFPort.OFPP_NONE)
+ .setLengthU(OFFlowMod.MINIMUM_LENGTH);
+
+ fm.getActions().clear();
+ }
+
+ IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
+ if (sw == null) {
+ log.debug("Switch not found when writing flow mods");
+ return;
+ }
+
+ List<OFMessage> msgList = new ArrayList<OFMessage>(flowMods.size());
+ msgList.addAll(flowMods);
+
+ try {
+ sw.write(msgList, null);
+ } catch (IOException e) {
+ log.error("Error writing to switch", e);
+ }
+ }
+
+ //TODO can the Prontos handle being sent all flow mods in one message?
+ public synchronized void switchConnected(IOFSwitch sw) {
+ log.debug("Switch connected: {}", sw);
+
+ ensureCacheForSwitch(sw.getId());
+
+ List<OFFlowMod> flowMods = flowCache.get(sw.getId());
+
+ Collections.sort(flowMods, cookieComparator);
+
+ sw.clearAllFlowMods();
+
+ List<OFMessage> messages = new ArrayList<OFMessage>(flowMods.size());
+ messages.addAll(flowMods);
+
+ try {
+ sw.write(messages, null);
+ } catch (IOException e) {
+ log.error("Failure writing flow mods to switch {}",
+ HexString.toHexString(sw.getId()));
+ }
+ }
+
+ private void ensureCacheForSwitch(long dpid) {
+ if (!flowCache.containsKey(dpid)) {
+ flowCache.put(dpid, new ArrayList<OFFlowMod>());
+ }
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index 31cc4fc..b6a9591 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -224,7 +224,7 @@
return Command.CONTINUE;
}
- protected void handleArpRequest(IOFSwitch sw, OFPacketIn pi, ARP arp) {
+ private void handleArpRequest(IOFSwitch sw, OFPacketIn pi, ARP arp) {
if (log.isTraceEnabled()) {
log.trace("ARP request received for {}",
inetAddressToString(arp.getTargetProtocolAddress()));
@@ -277,7 +277,7 @@
}
}
- protected void handleArpReply(IOFSwitch sw, OFPacketIn pi, ARP arp){
+ private void handleArpReply(IOFSwitch sw, OFPacketIn pi, ARP arp){
if (log.isTraceEnabled()) {
log.trace("ARP reply recieved: {} => {}, on {}/{}", new Object[] {
inetAddressToString(arp.getSenderProtocolAddress()),