* Refactor/cleanup the processing of events inside the FlowEventHandler.
* Refactor/cleanup the pushing of the modified Flow Entries inside
the FlowManager.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index cb1e678..f3e47f4 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -10,6 +10,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import net.floodlightcontroller.core.IOFSwitch;
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
@@ -60,6 +61,17 @@
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ //
+ // Transient state for processing the Flow Paths:
+ // - The new Flow Paths
+ // - The Flow Paths that should be recomputed
+ // - The Flow Paths with modified Flow Entries
+ //
+ private List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
+ private List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
+ private List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
+
+
/**
* Constructor for a given Flow Manager and Datagrid Service.
*
@@ -81,10 +93,9 @@
protected Topology getTopology() { return this.topology; }
/**
- * Run the thread.
+ * Startup processing.
*/
- @Override
- public void run() {
+ private void startup() {
//
// Obtain the initial Topology state
//
@@ -116,6 +127,14 @@
// Process the initial events (if any)
processEvents();
+ }
+
+ /**
+ * Run the thread.
+ */
+ @Override
+ public void run() {
+ startup();
//
// The main loop
@@ -162,17 +181,108 @@
* Process the events (if any)
*/
private void processEvents() {
- List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
- List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
- List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
+ List<FlowEntry> modifiedFlowEntries;
if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
flowEntryEvents.isEmpty()) {
return; // Nothing to do
}
+ processFlowPathEvents();
+ processTopologyEvents();
//
- // Process the Flow Path events
+ // Add all new Flows: should be done after processing the Flow Path
+ // and Topology events.
+ //
+ for (FlowPath flowPath : newFlowPaths) {
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+
+ processFlowEntryEvents();
+
+ // Recompute all affected Flow Paths and keep only the modified
+ for (FlowPath flowPath : recomputeFlowPaths) {
+ if (recomputeFlowPath(flowPath))
+ modifiedFlowPaths.add(flowPath);
+ }
+
+ modifiedFlowEntries = extractModifiedFlowEntries(modifiedFlowPaths);
+
+ // Assign missing Flow Entry IDs
+ assignFlowEntryId(modifiedFlowEntries);
+
+ //
+ // Push the modified Flow Entries to switches, datagrid and database
+ //
+ flowManager.pushModifiedFlowEntriesToSwitches(modifiedFlowPaths);
+ flowManager.pushModifiedFlowEntriesToDatagrid(modifiedFlowEntries);
+ flowManager.pushModifiedFlowEntriesToDatabase(modifiedFlowEntries);
+
+ //
+ // Remove Flow Entries that were deleted
+ //
+ for (FlowPath flowPath : modifiedFlowPaths)
+ flowPath.dataPath().removeDeletedFlowEntries();
+
+ // Cleanup
+ topologyEvents.clear();
+ flowPathEvents.clear();
+ flowEntryEvents.clear();
+ //
+ newFlowPaths.clear();
+ recomputeFlowPaths.clear();
+ modifiedFlowPaths.clear();
+ }
+
+ /**
+ * Extract the modified Flow Entries.
+ */
+ private List<FlowEntry> extractModifiedFlowEntries(
+ List<FlowPath> modifiedFlowPaths) {
+ List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+
+ // Extract only the modified Flow Entries
+ for (FlowPath flowPath : modifiedFlowPaths) {
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() ==
+ FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+ modifiedFlowEntries.add(flowEntry);
+ }
+ }
+ }
+ return modifiedFlowEntries;
+ }
+
+ /**
+ * Assign the Flow Entry ID as needed.
+ */
+ private void assignFlowEntryId(List<FlowEntry> modifiedFlowEntries) {
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+ //
+ // Assign the Flow Entry ID only for Flow Entries for my switches
+ //
+ for (FlowEntry flowEntry : modifiedFlowEntries) {
+ // Update the Flow Entries only for my switches
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+ if (mySwitch == null)
+ continue;
+ if (! flowEntry.isValidFlowEntryId()) {
+ long id = flowManager.getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+ }
+ }
+
+ /**
+ * Process the Flow Path events.
+ */
+ private void processFlowPathEvents() {
+ //
+ // Process all Flow Path events and update the appropriate state
//
for (EventEntry<FlowPath> eventEntry : flowPathEvents) {
FlowPath flowPath = eventEntry.eventData();
@@ -243,9 +353,14 @@
}
}
}
+ }
+ /**
+ * Process the Topology events.
+ */
+ private void processTopologyEvents() {
//
- // Process the topology events
+ // Process all Topology events and update the appropriate state
//
boolean isTopologyModified = false;
for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
@@ -267,22 +382,17 @@
// TODO: For now, if the topology changes, we recompute all Flows
recomputeFlowPaths.addAll(allFlowPaths.values());
}
+ }
- // Add all new Flows
- for (FlowPath flowPath : newFlowPaths) {
- allFlowPaths.put(flowPath.flowId().value(), flowPath);
- }
-
- // Recompute all affected Flow Paths and keep only the modified
- for (FlowPath flowPath : recomputeFlowPaths) {
- if (recomputeFlowPath(flowPath))
- modifiedFlowPaths.add(flowPath);
- }
-
+ /**
+ * Process the Flow Entry events.
+ */
+ private void processFlowEntryEvents() {
//
- // Process previously unmatched Flow Entry updates
+ // Update Flow Entries with previously unmatched Flow Entry updates
//
- if ((! flowPathEvents.isEmpty()) && (! unmatchedFlowEntryUpdates.isEmpty())) {
+ if ((! flowPathEvents.isEmpty()) &&
+ (! unmatchedFlowEntryUpdates.isEmpty())) {
List<FlowEntry> remainingUpdates = new LinkedList<FlowEntry>();
for (FlowEntry flowEntry : unmatchedFlowEntryUpdates) {
if (! updateFlowEntry(flowEntry))
@@ -292,7 +402,7 @@
}
//
- // Process the Flow Entry events
+ // Process all Flow Entry events and update the appropriate state
//
for (EventEntry<FlowEntry> eventEntry : flowEntryEvents) {
FlowEntry flowEntry = eventEntry.eventData();
@@ -315,16 +425,6 @@
break;
}
}
-
- //
- // Push the Flow Entries that have been modified
- //
- flowManager.pushModifiedFlowEntries(modifiedFlowPaths);
-
- // Cleanup
- topologyEvents.clear();
- flowPathEvents.clear();
- flowEntryEvents.clear();
}
/**