Added a producer-consumer queue between receiving route updates and processing them in order to speed up receiving the updates to keep BGPd happy
diff --git a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
index 7848d34..acdf185 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
@@ -10,7 +10,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -61,6 +64,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
public class BgpRoute implements IFloodlightModule, IBgpRouteService,
ITopologyListener, IOFSwitchListener {
@@ -75,6 +80,8 @@
protected ProxyArpManager proxyArp;
protected static Ptree ptree;
+ protected BlockingQueue<RibUpdate> ribUpdates;
+
protected String bgpdRestIp;
protected String routerId;
protected String configFilename = "config.json";
@@ -119,9 +126,9 @@
ITopoLinkService topoLinkService = new TopoLinkServiceImpl();
List<Link> activeLinks = topoLinkService.getActiveLinks();
- for (Link l : activeLinks){
+ //for (Link l : activeLinks){
//log.debug("active link: {}", l);
- }
+ //}
Iterator<LDUpdate> it = linkUpdates.iterator();
while (it.hasNext()){
@@ -139,12 +146,12 @@
if (linkUpdates.isEmpty()){
//All updates have been seen in network map.
//We can check if topology is ready
- log.debug("No know changes outstanding. Checking topology now");
+ log.debug("No known changes outstanding. Checking topology now");
checkStatus();
}
else {
//We know of some link updates that haven't propagated to the database yet
- log.debug("Some changes not found in network map- size {}", linkUpdates.size());
+ log.debug("Some changes not found in network map - {} links missing", linkUpdates.size());
topologyChangeDetectorTask.reschedule(TOPO_DETECTION_WAIT, TimeUnit.SECONDS);
}
}
@@ -215,6 +222,8 @@
throws FloodlightModuleException {
ptree = new Ptree(32);
+
+ ribUpdates = new LinkedBlockingQueue<RibUpdate>();
// Register floodlight provider and REST handler.
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
@@ -423,6 +432,53 @@
}
}
+ @Override
+ public void newRibUpdate(RibUpdate update) {
+ ribUpdates.add(update);
+ }
+
+ //TODO temporary
+ public void wrapPrefixAdded(RibUpdate update) {
+ Prefix prefix = update.getPrefix();
+
+ PtreeNode node = ptree.acquire(prefix.getAddress(), prefix.getPrefixLength());
+
+ if (node.rib != null) {
+ node.rib = null;
+ ptree.delReference(node);
+ }
+ node.rib = update.getRibEntry();
+
+ prefixAdded(node);
+ }
+
+ //TODO temporary
+ public void wrapPrefixDeleted(RibUpdate update) {
+ Prefix prefix = update.getPrefix();
+
+ PtreeNode node = ptree.lookup(prefix.getAddress(), prefix.getPrefixLength());
+
+ /*
+ * Remove the flows from the switches before the rib is lost
+ * Theory: we could get a delete for a prefix not in the Ptree.
+ * This would result in a null node being returned. We could get a delete for
+ * a node that's not actually there, but is a aggregate node. This would result
+ * in a non-null node with a null rib. Only a non-null node with a non-null
+ * rib is an actual prefix in the Ptree.
+ */
+ if (node != null && node.rib != null){
+ prefixDeleted(node);
+ }
+
+ if (node != null && node.rib != null) {
+ if (update.getRibEntry().equals(node.rib)) {
+ node.rib = null;
+ ptree.delReference(node);
+ }
+ }
+ }
+
+ @Override
public void prefixAdded(PtreeNode node) {
if (!topologyReady){
return;
@@ -549,6 +605,7 @@
}
//TODO this is largely untested
+ @Override
public void prefixDeleted(PtreeNode node) {
if (!topologyReady) {
return;
@@ -915,9 +972,45 @@
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, proxyArp);
+ ExecutorService e = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("bgp-updates-%d").build());
+
+
+ e.execute(new Runnable() {
+ @Override
+ public void run() {
+ doUpdatesThread();
+ }
+ });
+
//Retrieve the RIB from BGPd during startup
retrieveRib();
}
+
+ private void doUpdatesThread() {
+ boolean interrupted = false;
+ try {
+ while (true) {
+ try {
+ RibUpdate update = ribUpdates.take();
+ switch (update.getOperation()){
+ case UPDATE:
+ wrapPrefixAdded(update);
+ break;
+ case DELETE:
+ wrapPrefixDeleted(update);
+ break;
+ }
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
@Override
public void topologyChanged() {