Tightened up the handling of ARP responses to prevent race conditions causing the PTrie and switch flows to get out of sync
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
index 9123792..caa8e49 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
@@ -3,7 +3,6 @@
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -130,6 +129,8 @@
protected SetMultimap<InetAddress, RibUpdate> prefixesWaitingOnArp;
protected SetMultimap<InetAddress, PathUpdate> pathsWaitingOnArp;
+ protected ExecutorService bgpUpdatesExecutor;
+
protected Multimap<Prefix, PushedFlowMod> pushedFlows;
private class PushedFlowMod {
@@ -279,6 +280,9 @@
pushedFlows = HashMultimap.<Prefix, PushedFlowMod>create();
+ bgpUpdatesExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("bgp-updates-%d").build());
+
//Read in config values
bgpdRestIp = context.getConfigParams(this).get("BgpdRestIp");
if (bgpdRestIp == null){
@@ -413,6 +417,7 @@
*/
//TODO once the Ptree is object oriented this can go
+ /*
private String getPrefixFromPtree(PtreeNode node){
InetAddress address = null;
try {
@@ -424,6 +429,7 @@
}
return address.toString() + "/" + node.rib.masklen;
}
+ */
private void retrieveRib(){
String url = "http://" + bgpdRestIp + "/wm/bgp/" + routerId;
@@ -475,9 +481,14 @@
node.rib = rib;
*/
- ptree.put(p, rib);
+ //ptree.put(p, rib);
- addPrefixFlows(p, rib);
+ //addPrefixFlows(p, rib);
+ try {
+ ribUpdates.put(new RibUpdate(Operation.UPDATE, p, rib));
+ } catch (InterruptedException e) {
+ log.debug("Interrupted while pushing onto update queue");
+ }
}
}
@@ -486,14 +497,14 @@
ribUpdates.add(update);
}
- public void processRibAdd(RibUpdate update) {
+ public synchronized void processRibAdd(RibUpdate update) {
Prefix prefix = update.getPrefix();
//PtreeNode node = ptree.acquire(prefix.getAddress(), prefix.getPrefixLength());
Rib rib = ptree.put(prefix, update.getRibEntry());
//if (node.rib != null) {
- if (rib != null) {
+ if (rib != null && !rib.equals(update.getRibEntry())) {
//There was an existing nexthop for this prefix. This update supersedes that,
//so we need to remove the old flows for this prefix from the switches
deletePrefixFlows(prefix);
@@ -510,7 +521,7 @@
addPrefixFlows(prefix, update.getRibEntry());
}
- public void processRibDelete(RibUpdate update) {
+ public synchronized void processRibDelete(RibUpdate update) {
Prefix prefix = update.getPrefix();
//PtreeNode node = ptree.lookup(prefix.getAddress(), prefix.getPrefixLength());
@@ -639,6 +650,7 @@
match.setDataLayerType(Ethernet.TYPE_IPv4);
match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+ /*
InetAddress address = null;
try {
address = InetAddress.getByAddress(prefix.getAddress());
@@ -646,10 +658,11 @@
//Should never happen is the reverse conversion has already been done
log.error("Malformed IP address");
return;
- }
+ }*/
- match.setFromCIDR(address.getHostAddress() + "/" +
- prefix.getPrefixLength(), OFMatch.STR_NW_DST);
+ //match.setFromCIDR(address.getHostAddress() + "/" +
+ // prefix.getPrefixLength(), OFMatch.STR_NW_DST);
+ match.setFromCIDR(prefix.toString(), OFMatch.STR_NW_DST);
fm.setMatch(match);
//Set up MAC rewrite action
@@ -991,11 +1004,31 @@
Set<RibUpdate> prefixesToPush = prefixesWaitingOnArp.removeAll(ipAddress);
- for (RibUpdate update : prefixesToPush) {
- //These will always be adds
- log.debug("Pushing prefix {} next hop {}", update.getPrefix(),
- update.getRibEntry().nextHop.getHostAddress());
- addPrefixFlows(update.getPrefix(), update.getRibEntry());
+ /*
+ * We synchronize on this to prevent changes to the ptree while we're pushing
+ * flows to the switches. If the ptree changes, the ptree and switches
+ * could get out of sync.
+ */
+ synchronized (this) {
+ for (RibUpdate update : prefixesToPush) {
+ //These will always be adds
+
+ //addPrefixFlows(update.getPrefix(), update.getRibEntry());
+ //processRibAdd(update);
+ Rib rib = ptree.lookup(update.getPrefix());
+ if (rib != null && rib.equals(update.getRibEntry())) {
+ log.debug("Pushing prefix {} next hop {}", update.getPrefix(),
+ rib.nextHop.getHostAddress());
+ //We only push prefix flows if the prefix is still in the ptree
+ //and the next hop is the same as our update. The prefix could
+ //have been removed while we were waiting for the ARP, or the
+ //next hop could have changed.
+ addPrefixFlows(update.getPrefix(), rib);
+ } else {
+ log.debug("Received ARP response, but {},{} is no longer in ptree",
+ update.getPrefix(), update.getRibEntry());
+ }
+ }
}
}
@@ -1013,6 +1046,7 @@
}
*/
+ /*
synchronized (ptree) {
Iterator<IPatriciaTrie.Entry> it = ptree.iterator();
while (it.hasNext()) {
@@ -1020,6 +1054,14 @@
addPrefixFlows(entry.getPrefix(), entry.getRib());
}
}
+ */
+
+ bgpUpdatesExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ doUpdatesThread();
+ }
+ });
}
@@ -1078,17 +1120,6 @@
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, proxyArp);
- ExecutorService e = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("bgp-updates-%d").build());
-
-
- e.execute(new Runnable() {
- @Override
- public void run() {
- doUpdatesThread();
- }
- });
-
//Retrieve the RIB from BGPd during startup
retrieveRib();
}