Added a producer-consumer queue between receiving route updates and processing them in order to speed up receiving the updates to keep BGPd happy
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
index 7848d34..acdf185 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
@@ -10,7 +10,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -61,6 +64,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
public class BgpRoute implements IFloodlightModule, IBgpRouteService,
ITopologyListener, IOFSwitchListener {
@@ -75,6 +80,8 @@
protected ProxyArpManager proxyArp;
protected static Ptree ptree;
+ protected BlockingQueue<RibUpdate> ribUpdates;
+
protected String bgpdRestIp;
protected String routerId;
protected String configFilename = "config.json";
@@ -119,9 +126,9 @@
ITopoLinkService topoLinkService = new TopoLinkServiceImpl();
List<Link> activeLinks = topoLinkService.getActiveLinks();
- for (Link l : activeLinks){
+ //for (Link l : activeLinks){
//log.debug("active link: {}", l);
- }
+ //}
Iterator<LDUpdate> it = linkUpdates.iterator();
while (it.hasNext()){
@@ -139,12 +146,12 @@
if (linkUpdates.isEmpty()){
//All updates have been seen in network map.
//We can check if topology is ready
- log.debug("No know changes outstanding. Checking topology now");
+ log.debug("No known changes outstanding. Checking topology now");
checkStatus();
}
else {
//We know of some link updates that haven't propagated to the database yet
- log.debug("Some changes not found in network map- size {}", linkUpdates.size());
+ log.debug("Some changes not found in network map - {} links missing", linkUpdates.size());
topologyChangeDetectorTask.reschedule(TOPO_DETECTION_WAIT, TimeUnit.SECONDS);
}
}
@@ -215,6 +222,8 @@
throws FloodlightModuleException {
ptree = new Ptree(32);
+
+ ribUpdates = new LinkedBlockingQueue<RibUpdate>();
// Register floodlight provider and REST handler.
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
@@ -423,6 +432,53 @@
}
}
+ @Override
+ public void newRibUpdate(RibUpdate update) {
+ ribUpdates.add(update);
+ }
+
+ //TODO temporary
+ public void wrapPrefixAdded(RibUpdate update) {
+ Prefix prefix = update.getPrefix();
+
+ PtreeNode node = ptree.acquire(prefix.getAddress(), prefix.getPrefixLength());
+
+ if (node.rib != null) {
+ node.rib = null;
+ ptree.delReference(node);
+ }
+ node.rib = update.getRibEntry();
+
+ prefixAdded(node);
+ }
+
+ //TODO temporary
+ public void wrapPrefixDeleted(RibUpdate update) {
+ Prefix prefix = update.getPrefix();
+
+ PtreeNode node = ptree.lookup(prefix.getAddress(), prefix.getPrefixLength());
+
+ /*
+ * Remove the flows from the switches before the rib is lost
+ * Theory: we could get a delete for a prefix not in the Ptree.
+ * This would result in a null node being returned. We could get a delete for
+ * a node that's not actually there, but is a aggregate node. This would result
+ * in a non-null node with a null rib. Only a non-null node with a non-null
+ * rib is an actual prefix in the Ptree.
+ */
+ if (node != null && node.rib != null){
+ prefixDeleted(node);
+ }
+
+ if (node != null && node.rib != null) {
+ if (update.getRibEntry().equals(node.rib)) {
+ node.rib = null;
+ ptree.delReference(node);
+ }
+ }
+ }
+
+ @Override
public void prefixAdded(PtreeNode node) {
if (!topologyReady){
return;
@@ -549,6 +605,7 @@
}
//TODO this is largely untested
+ @Override
public void prefixDeleted(PtreeNode node) {
if (!topologyReady) {
return;
@@ -915,9 +972,45 @@
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, proxyArp);
+ ExecutorService e = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("bgp-updates-%d").build());
+
+
+ e.execute(new Runnable() {
+ @Override
+ public void run() {
+ doUpdatesThread();
+ }
+ });
+
//Retrieve the RIB from BGPd during startup
retrieveRib();
}
+
+ private void doUpdatesThread() {
+ boolean interrupted = false;
+ try {
+ while (true) {
+ try {
+ RibUpdate update = ribUpdates.take();
+ switch (update.getOperation()){
+ case UPDATE:
+ wrapPrefixAdded(update);
+ break;
+ case DELETE:
+ wrapPrefixDeleted(update);
+ break;
+ }
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
@Override
public void topologyChanged() {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRouteResource.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRouteResource.java
index 6860f74..19b44c8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRouteResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRouteResource.java
@@ -2,6 +2,8 @@
import java.net.UnknownHostException;
+import net.onrc.onos.ofcontroller.bgproute.RibUpdate.Operation;
+
import org.restlet.resource.Delete;
import org.restlet.resource.Get;
import org.restlet.resource.Post;
@@ -100,7 +102,7 @@
IBgpRouteService bgpRoute = (IBgpRouteService) getContext().getAttributes().
get(IBgpRouteService.class.getCanonicalName());
- Ptree ptree = bgpRoute.getPtree();
+ //Ptree ptree = bgpRoute.getPtree();
String routerId = (String) getRequestAttributes().get("routerid");
String prefix = (String) getRequestAttributes().get("prefix");
@@ -125,9 +127,13 @@
return reply + "\n";
}
- PtreeNode node = ptree.acquire(p.getAddress(), p.getPrefixLength());
Rib rib = new Rib(routerId, nexthop, p.getPrefixLength());
+ bgpRoute.newRibUpdate(new RibUpdate(Operation.UPDATE, p, rib));
+
+ /*
+ PtreeNode node = ptree.acquire(p.getAddress(), p.getPrefixLength());
+
if (node.rib != null) {
node.rib = null;
ptree.delReference(node);
@@ -135,6 +141,7 @@
node.rib = rib;
bgpRoute.prefixAdded(node);
+ */
reply = "[POST: " + prefix + "/" + mask + ":" + nexthop + "]";
log.info(reply);
@@ -158,7 +165,7 @@
IBgpRouteService bgpRoute = (IBgpRouteService)getContext().getAttributes().
get(IBgpRouteService.class.getCanonicalName());
- Ptree ptree = bgpRoute.getPtree();
+ //Ptree ptree = bgpRoute.getPtree();
String routerId = (String) getRequestAttributes().get("routerid");
String prefix = (String) getRequestAttributes().get("prefix");
@@ -182,7 +189,12 @@
log.info(reply);
return reply + "\n";
}
-
+
+ Rib r = new Rib(routerId, nextHop, p.getPrefixLength());
+
+ bgpRoute.newRibUpdate(new RibUpdate(Operation.DELETE, p, r));
+
+ /*
PtreeNode node = ptree.lookup(p.getAddress(), p.getPrefixLength());
//Remove the flows from the switches before the rib is lost
@@ -195,7 +207,7 @@
bgpRoute.prefixDeleted(node);
}
- Rib r = new Rib(routerId, nextHop, p.getPrefixLength());
+
if (node != null && node.rib != null) {
if (r.equals(node.rib)) {
@@ -203,6 +215,7 @@
ptree.delReference(node);
}
}
+ */
reply =reply + "[DELE: " + prefix + "/" + mask + ":" + nextHop + "]";
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/IBgpRouteService.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/IBgpRouteService.java
index c84a415..3dbc940 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/bgproute/IBgpRouteService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/IBgpRouteService.java
@@ -14,6 +14,12 @@
public void clearPtree();
+ /**
+ * Pass a RIB update to the {@link IBgpRouteService}
+ * @param update
+ */
+ public void newRibUpdate(RibUpdate update);
+
//TODO This functionality should be provided by some sort of Ptree listener framework
public void prefixAdded(PtreeNode node);
public void prefixDeleted(PtreeNode node);