Added thread-based code for measuring the the Shortest Path
computation.
NOTE: This code is added for reference purpose only, and will
be removed.
diff --git a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
index a00d5d9..51e0509 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
@@ -8,9 +8,12 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import net.floodlightcontroller.core.IFloodlightProviderService;
@@ -93,6 +96,38 @@
private final ScheduledExecutorService mapReaderScheduler =
Executors.newScheduledThreadPool(1);
+ private BlockingQueue<Runnable> shortestPathQueue = new LinkedBlockingQueue<Runnable>();
+ private ThreadPoolExecutor shortestPathExecutor =
+ new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, shortestPathQueue);
+
+ class ShortestPathTask implements Runnable {
+ private int hint;
+ private ITopoRouteService topoRouteService;
+ private ArrayList<DataPath> dpList;
+
+ public ShortestPathTask(int hint,
+ ITopoRouteService topoRouteService,
+ ArrayList<DataPath> dpList) {
+ this.hint = hint;
+ this.topoRouteService = topoRouteService;
+ this.dpList = dpList;
+ }
+
+ @Override
+ public void run() {
+ String logMsg = "MEASUREMENT: Running Thread hint " + this.hint;
+ log.debug(logMsg);
+ long startTime = System.nanoTime();
+ for (DataPath dp : this.dpList) {
+ topoRouteService.getShortestPath(dp.srcPort(), dp.dstPort());
+ }
+ long estimatedTime = System.nanoTime() - startTime;
+ double rate = (estimatedTime > 0)? ((double)dpList.size() * 1000000000) / estimatedTime: 0.0;
+ logMsg = "MEASUREMENT: Computed Thread hint " + hint + ": " + dpList.size() + " shortest paths in " + (double)estimatedTime / 1000000000 + " sec: " + rate + " flows/s";
+ log.debug(logMsg);
+ }
+ }
+
final Runnable measureShortestPath = new Runnable() {
public void run() {
log.debug("Recomputing Shortest Paths from the Network Map Flows...");
@@ -108,14 +143,23 @@
return;
}
+ int leftoverQueueSize = shortestPathExecutor.getQueue().size();
+ if (leftoverQueueSize > 0) {
+ String logMsg = "MEASUREMENT: Leftover Shortest Path Queue Size: " + leftoverQueueSize;
+ log.debug(logMsg);
+ return;
+ }
+ log.debug("MEASUREMENT: Beginning Shortest Path Computation");
+
//
// Recompute the Shortest Paths for all Flows
//
int counter = 0;
+ int hint = 0;
+ ArrayList<DataPath> dpList = new ArrayList<DataPath>();
long startTime = System.nanoTime();
Iterable<IFlowPath> allFlowPaths = conn.utils().getAllFlowPaths(conn);
for (IFlowPath flowPathObj : allFlowPaths) {
- counter++;
FlowId flowId = new FlowId(flowPathObj.getFlowId());
// log.debug("Found Path {}", flowId.toString());
@@ -125,9 +169,44 @@
Port dstPort = new Port(flowPathObj.getDstPort());
SwitchPort srcSwitchPort = new SwitchPort(srcDpid, srcPort);
SwitchPort dstSwitchPort = new SwitchPort(dstDpid, dstPort);
+ DataPath dp = new DataPath();
+ dp.setSrcPort(srcSwitchPort);
+ dp.setDstPort(dstSwitchPort);
+ dpList.add(dp);
+ if ((dpList.size() % 10) == 0) {
+ shortestPathExecutor.execute(
+ new ShortestPathTask(hint, topoRouteService,
+ dpList));
+ dpList = new ArrayList<DataPath>();
+ hint++;
+ }
+ /*
DataPath dataPath =
topoRouteService.getShortestPath(srcSwitchPort,
dstSwitchPort);
+ */
+
+ /*
+ shortestPathExecutor.execute(
+ new ShortestPathTask(topoRouteService,
+ srcSwitchPort,
+ dstSwitchPort));
+ */
+ counter++;
+ }
+ if (dpList.size() > 0) {
+ shortestPathExecutor.execute(
+ new ShortestPathTask(hint, topoRouteService,
+ dpList));
+ }
+
+ // Wait for all tasks to finish
+ try {
+ while (shortestPathExecutor.getQueue().size() > 0) {
+ Thread.sleep(100);
+ }
+ } catch (InterruptedException ex) {
+ log.debug("MEASUREMENT: Shortest Path Computation interrupted");
}
conn.endTx(Transaction.COMMIT);