Speedup a bit the periodic flow and flow entry processing.
Split the thread that process the flows and the flow entries
into two threads: the first thread processes the flow entries;
the second thread processes the flow reconciliation.
Initial measurements show speed improvement in the range 20-30%.
E.g., from 18 seconds down to 13 seconds.
Reviewed by: Jono
diff --git a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
index 51c39a0..ace006d 100644
--- a/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
+++ b/src/main/java/net/floodlightcontroller/flowcache/FlowManager.java
@@ -102,13 +102,14 @@
private final ScheduledExecutorService mapReaderScheduler =
Executors.newScheduledThreadPool(1);
+ private final ScheduledExecutorService shortestPathReconcileScheduler =
+ Executors.newScheduledThreadPool(1);
+
final Runnable mapReader = new Runnable() {
public void run() {
long startTime = System.nanoTime();
int counterAllFlowEntries = 0;
int counterMyNotUpdatedFlowEntries = 0;
- int counterAllFlowPaths = 0;
- int counterMyFlowPaths = 0;
if (floodlightProvider == null) {
log.debug("FloodlightProvider service not found!");
@@ -120,7 +121,6 @@
new LinkedList<IFlowEntry>();
LinkedList<IFlowEntry> deleteFlowEntries =
new LinkedList<IFlowEntry>();
- LinkedList<IFlowPath> deleteFlows = new LinkedList<IFlowPath>();
//
// Fetch all Flow Entries and select only my Flow Entries
@@ -218,6 +218,45 @@
conn.utils().removeFlowEntry(conn, flowEntryObj);
}
+ conn.endTx(Transaction.COMMIT);
+
+ if (processed_measurement_flow) {
+ long estimatedTime =
+ System.nanoTime() - modifiedMeasurementFlowTime;
+ String logMsg = "MEASUREMENT: Pushed Flow delay: " +
+ (double)estimatedTime / 1000000000 + " sec";
+ log.debug(logMsg);
+ }
+
+ long estimatedTime = System.nanoTime() - startTime;
+ double rate = 0.0;
+ if (estimatedTime > 0)
+ rate = ((double)counterAllFlowEntries * 1000000000) / estimatedTime;
+ String logMsg = "MEASUREMENT: Processed AllFlowEntries: " +
+ counterAllFlowEntries + " MyNotUpdatedFlowEntries: " +
+ counterMyNotUpdatedFlowEntries + " in " +
+ (double)estimatedTime / 1000000000 + " sec: " +
+ rate + " paths/s";
+ log.debug(logMsg);
+ }
+ };
+
+ final Runnable shortestPathReconcile = new Runnable() {
+ public void run() {
+ long startTime = System.nanoTime();
+ int counterAllFlowPaths = 0;
+ int counterMyFlowPaths = 0;
+
+ if (floodlightProvider == null) {
+ log.debug("FloodlightProvider service not found!");
+ return;
+ }
+ Map<Long, IOFSwitch> mySwitches =
+ floodlightProvider.getSwitches();
+ LinkedList<IFlowPath> deleteFlows = new LinkedList<IFlowPath>();
+
+ boolean processed_measurement_flow = false;
+
//
// Fetch and recompute the Shortest Path for those
// Flow Paths this controller is responsible for.
@@ -336,9 +375,7 @@
double rate = 0.0;
if (estimatedTime > 0)
rate = ((double)counterAllFlowPaths * 1000000000) / estimatedTime;
- String logMsg = "MEASUREMENT: Processed AllFlowEntries: " +
- counterAllFlowEntries + " MyNotUpdatedFlowEntries: " +
- counterMyNotUpdatedFlowEntries + " AllFlowPaths: " +
+ String logMsg = "MEASUREMENT: Processed AllFlowPaths: " +
counterAllFlowPaths + " MyFlowPaths: " +
counterMyFlowPaths + " in " +
(double)estimatedTime / 1000000000 + " sec: " +
@@ -350,6 +387,9 @@
final ScheduledFuture<?> mapReaderHandle =
mapReaderScheduler.scheduleAtFixedRate(mapReader, 3, 3, TimeUnit.SECONDS);
+ final ScheduledFuture<?> shortestPathReconcileHandle =
+ shortestPathReconcileScheduler.scheduleAtFixedRate(shortestPathReconcile, 3, 3, TimeUnit.SECONDS);
+
@Override
public void init(String conf) {
conn = GraphDBConnection.getInstance(conf);
@@ -1295,73 +1335,14 @@
//
// Remove the old Flow Entries, and add the new Flow Entries
//
-
- //
- // Remove the Flow Entries from the Network MAP
- //
Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
LinkedList<IFlowEntry> deleteFlowEntries = new LinkedList<IFlowEntry>();
for (IFlowEntry flowEntryObj : flowEntries) {
- String dpidStr = flowEntryObj.getSwitchDpid();
- if (dpidStr == null)
- continue;
- Dpid dpid = new Dpid(dpidStr);
- IOFSwitch mySwitch = mySwitches.get(dpid.value());
-
flowEntryObj.setUserState("FE_USER_DELETE");
- if (mySwitch == null) {
- //
- // Not my switch. Mark it for deletion in the Network MAP
- //
- flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- continue;
- }
-
- deleteFlowEntries.add(flowEntryObj);
-
- //
- // Delete the flow entry from the switch
- //
- // flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- installFlowEntry(mySwitch, flowObj, flowEntryObj);
- // flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
+ flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
}
- for (IFlowEntry flowEntryObj : deleteFlowEntries) {
- flowObj.removeFlowEntry(flowEntryObj);
- conn.utils().removeFlowEntry(conn, flowEntryObj);
- }
-
- //
- // Install the new shortest path into the Network MAP and the switches.
- //
for (FlowEntry flowEntry : newDataPath.flowEntries()) {
- flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_ADD);
- IFlowEntry flowEntryObj = addFlowEntry(flowObj, flowEntry);
- if (flowEntryObj == null) {
- //
- // TODO: Remove the "new Object[] wrapper in the statement
- // below after the SLF4J logger is upgraded to
- // Version 1.7.5
- //
- log.error("Cannot add Flow Entry to switch {} for Path Flow from {} to {} : Flow Entry not in the Network MAP",
- new Object[] {
- flowEntry.dpid(),
- newDataPath.srcPort(),
- newDataPath.dstPort()
- });
- continue;
- }
-
- IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
- if (mySwitch == null) {
- // Not my switch: just add to the Network MAP
- continue;
- }
-
- // Install the Flow Entry into the switch
- if (installFlowEntry(mySwitch, flowObj, flowEntryObj)) {
- flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
- }
+ addFlowEntry(flowObj, flowEntry);
}
//