Misc. bug fixes and optimizations:
* Speed-up the implementation of deleteAllFlows() and clearAllFlows():
fetch only the FlowId of all installed flows, instead of
all Flow state.
* Add a commented-out code inside deleteAllFlows() that can be
used to delete all Flows by using multiple threads.
* Few bug fixes and extra log messages related to the measurement code
for adding Flow Entries.
* Print the stack trace if there is an exception when adding a Flow.
diff --git a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
index 251620c..57c136c 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
@@ -1,6 +1,8 @@
package net.floodlightcontroller.flowcache;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -11,6 +13,7 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -93,7 +96,6 @@
private long modifiedMeasurementFlowTime = 0;
//
private LinkedList<FlowPath> measurementStoredPaths = new LinkedList<FlowPath>();
- private LinkedList<FlowPath> measurementProcessingPaths = null;
private long measurementStartTimeProcessingPaths = 0;
private long measurementEndTimeProcessingPaths = 0;
@@ -537,8 +539,14 @@
} catch (Exception e) {
// TODO: handle exceptions
conn.endTx(Transaction.ROLLBACK);
- log.error(":addFlow FlowId:{} failed",
- flowPath.flowId().toString());
+
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ String stacktrace = sw.toString();
+
+ log.error(":addFlow FlowId:{} failed: {}",
+ flowPath.flowId().toString(),
+ stacktrace);
}
if (flowObj == null) {
log.error(":addFlow FlowId:{} failed: Flow object not created",
@@ -757,13 +765,63 @@
*/
@Override
public boolean deleteAllFlows() {
+ List<Thread> threads = new LinkedList<Thread>();
+ final ConcurrentLinkedQueue<FlowId> concurrentAllFlowIds =
+ new ConcurrentLinkedQueue<FlowId>();
- // Get all flows and delete them one-by-one
- ArrayList<FlowPath> allFlows = getAllFlows();
- for (FlowPath flowPath : allFlows) {
- deleteFlow(flowPath.flowId());
+ // Get all Flow IDs
+ Iterable<IFlowPath> allFlowPaths = conn.utils().getAllFlowPaths(conn);
+ for (IFlowPath flowPathObj : allFlowPaths) {
+ if (flowPathObj == null)
+ continue;
+ String flowIdStr = flowPathObj.getFlowId();
+ if (flowIdStr == null)
+ continue;
+ FlowId flowId = new FlowId(flowIdStr);
+ concurrentAllFlowIds.add(flowId);
}
+ // Delete all flows one-by-one
+ for (FlowId flowId : concurrentAllFlowIds)
+ deleteFlow(flowId);
+
+ /*
+ * TODO: A faster mechanism to delete the Flow Paths by using
+ * a number of threads. Commented-out for now.
+ */
+ /*
+ //
+ // Create the threads to delete the Flow Paths
+ //
+ for (int i = 0; i < 10; i++) {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (true) {
+ FlowId flowId = concurrentAllFlowIds.poll();
+ if (flowId == null)
+ return;
+ deleteFlow(flowId);
+ }
+ }}, "Delete All Flow Paths");
+ threads.add(thread);
+ }
+
+ // Start processing
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ // Want for all threads to complete
+ for (Thread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ log.debug("Exception waiting for a thread to delete a Flow Path: ", e);
+ }
+ }
+ */
+
return true;
}
@@ -836,11 +894,23 @@
*/
@Override
public boolean clearAllFlows() {
+ List<FlowId> allFlowIds = new LinkedList<FlowId>();
- // Get all flows and clear them one-by-one
- ArrayList<FlowPath> allFlows = getAllFlows();
- for (FlowPath flowPath : allFlows) {
- clearFlow(flowPath.flowId());
+ // Get all Flow IDs
+ Iterable<IFlowPath> allFlowPaths = conn.utils().getAllFlowPaths(conn);
+ for (IFlowPath flowPathObj : allFlowPaths) {
+ if (flowPathObj == null)
+ continue;
+ String flowIdStr = flowPathObj.getFlowId();
+ if (flowIdStr == null)
+ continue;
+ FlowId flowId = new FlowId(flowIdStr);
+ allFlowIds.add(flowId);
+ }
+
+ // Clear all flows one-by-one
+ for (FlowId flowId : allFlowIds) {
+ clearFlow(flowId);
}
return true;
@@ -1816,7 +1886,7 @@
* @return the stored shortest-path flow on success, otherwise null.
*/
@Override
- public FlowPath measurementStorePathFlow(FlowPath flowPath) {
+ public synchronized FlowPath measurementStorePathFlow(FlowPath flowPath) {
//
// Prepare the Shortest Path computation if the first Flow Path
//
@@ -1867,10 +1937,13 @@
computedFlowPath.setFlowEntryMatch(new FlowEntryMatch(flowPath.flowEntryMatch()));
//
- // Add the computed Flow Path the the internal storage
+ // Add the computed Flow Path to the internal storage
//
measurementStoredPaths.add(computedFlowPath);
+ log.debug("Measurement storing path {}",
+ computedFlowPath.flowId().toString());
+
return (computedFlowPath);
}
@@ -1885,8 +1958,12 @@
public boolean measurementInstallPaths(Integer numThreads) {
List<Thread> threads = new LinkedList<Thread>();
- // Create a copy of the paths to install
- measurementProcessingPaths = new LinkedList<FlowPath>(measurementStoredPaths);
+ // Create a copy of the Flow Paths to install
+ final ConcurrentLinkedQueue<FlowPath> measurementProcessingPaths =
+ new ConcurrentLinkedQueue<FlowPath>(measurementStoredPaths);
+
+ log.debug("Measurement Installing {} flows",
+ measurementProcessingPaths.size());
//
// Create the threads to install the Flow Paths
@@ -1896,7 +1973,7 @@
@Override
public void run() {
while (true) {
- FlowPath flowPath = measurementPollFirstFlowPath();
+ FlowPath flowPath = measurementProcessingPaths.poll();
if (flowPath == null)
return;
// Install the Flow Path
@@ -1918,17 +1995,18 @@
thread.start();
}
- //
- // Wait until the end of processing time
- //
- while (measurementEndTimeProcessingPaths == 0) {
+ // Want for all threads to complete
+ for (Thread thread : threads) {
try {
- Thread.sleep(100);
+ thread.join();
} catch (InterruptedException e) {
- // Continue waiting
+ log.debug("Exception waiting for a thread to install a Flow Path: ", e);
}
}
+ // Record the end of processing
+ measurementEndTimeProcessingPaths = System.nanoTime();
+
return true;
}
@@ -1945,24 +2023,6 @@
}
/**
- * Get a Flow Path that needs to be installed for measurement purpose.
- *
- * If there is no next Flow Path to install, the end time measurement
- * is recorded.
- *
- * @return the next Flow Path to install if exists, otherwise null.
- */
- private synchronized FlowPath measurementPollFirstFlowPath() {
- FlowPath flowPath = measurementProcessingPaths.pollFirst();
-
- // Record the end of processing, if the first call
- if ((flowPath == null) && (measurementEndTimeProcessingPaths == 0))
- measurementEndTimeProcessingPaths = System.nanoTime();
-
- return flowPath;
- }
-
- /**
* Clear the path flows stored for measurement purpose.
*
* @return true on success, otherwise false.
@@ -1976,5 +2036,4 @@
return true;
}
-
}