Code refactoring to keep a local cache of the Flows that
are controlled by this instance.
NOTE: work-in progress.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 8fc6a12..34958d8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -267,16 +267,12 @@
if (enableOnrc2014MeasurementsFlows) {
- if (topologyEvents.isEmpty() && flowIdEvents.isEmpty() &&
- flowEntryIdEvents.isEmpty()) {
+ if (topologyEvents.isEmpty() && flowIdEvents.isEmpty()) {
return; // Nothing to do
}
Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
- // Fetch and prepare my flows
- prepareMyFlows(mySwitches);
-
// Process the Flow ID events
processFlowIdEvents(mySwitches);
@@ -299,21 +295,17 @@
}
}
- // Extract my modified Flow Entries
- modifiedFlowEntries = processFlowEntryIdEvents(mySwitches);
-
//
- // Push the modified state to the Flow Manager
+ // Push the modified state to the database
//
- flowManager.pushModifiedFlowState(modifiedFlowPaths.values(),
- modifiedFlowEntries);
+ flowManager.writeModifiedFlowPathsToDatabase(modifiedFlowPaths.values());
// Cleanup
topologyEvents.clear();
flowIdEvents.clear();
- flowEntryIdEvents.clear();
//
- allFlowPaths.clear();
+ // NOTE: Keep a cache with my Flow Paths
+ // allFlowPaths.clear();
shouldRecomputeFlowPaths.clear();
modifiedFlowPaths.clear();
@@ -456,50 +448,40 @@
}
/**
- * Prepare my flows.
+ * Fix a flow fetched from the database.
*
- * @param mySwitches the collection of my switches.
+ * @param flowPath the Flow to fix.
*/
- private void prepareMyFlows(Map<Long, IOFSwitch> mySwitches) {
- if (! topologyEvents.isEmpty()) {
- // Fetch my flows from the database
- ArrayList<FlowPath> myFlows = FlowDatabaseOperation.getAllMyFlows(dbHandler, mySwitches);
- for (FlowPath flowPath : myFlows) {
- log.debug("Found my flow: {}", flowPath);
+ private void fixFlowFromDatabase(FlowPath flowPath) {
+ //
+ // TODO: Bug workaround / fix :
+ // method FlowDatabaseOperation.extractFlowEntry() doesn't
+ // fetch the inPort and outPort, hence we assign them here.
+ //
+ // Assign the inPort and outPort for the Flow Entries
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ // Set the inPort
+ do {
+ if (flowEntry.inPort() != null)
+ break;
+ if (flowEntry.flowEntryMatch() == null)
+ break;
+ Port inPort = new Port(flowEntry.flowEntryMatch().inPort().value());
+ flowEntry.setInPort(inPort);
+ } while (false);
- allFlowPaths.put(flowPath.flowId().value(), flowPath);
-
- //
- // TODO: Bug workaround / fix :
- // method FlowDatabaseOperation.extractFlowEntry() doesn't
- // fetch the inPort and outPort, hence we assign them here.
- //
- // Assign the inPort and outPort for the Flow Entries
- for (FlowEntry flowEntry : flowPath.flowEntries()) {
- // Set the inPort
- do {
- if (flowEntry.inPort() != null)
- break;
- if (flowEntry.flowEntryMatch() == null)
- break;
- Port inPort = new Port(flowEntry.flowEntryMatch().inPort().value());
- flowEntry.setInPort(inPort);
- } while (false);
-
- // Set the outPort
- do {
- if (flowEntry.outPort() != null)
- break;
- for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
- if (fa.actionOutput() != null) {
- Port outPort = new Port(fa.actionOutput().port().value());
- flowEntry.setOutPort(outPort);
- break;
- }
- }
- } while (false);
+ // Set the outPort
+ do {
+ if (flowEntry.outPort() != null)
+ break;
+ for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
+ if (fa.actionOutput() != null) {
+ Port outPort = new Port(fa.actionOutput().port().value());
+ flowEntry.setOutPort(outPort);
+ break;
+ }
}
- }
+ } while (false);
}
}
@@ -509,30 +491,92 @@
* @param mySwitches the collection of my switches.
*/
private void processFlowIdEvents(Map<Long, IOFSwitch> mySwitches) {
+ List<FlowId> shouldFetchMyFlowIds = new LinkedList<FlowId>();
+
//
- // Automatically add all Flow ID events (for the Flows this instance
- // is responsible for) to the collection of Flows to recompute.
+ // Process all Flow Id events and update the appropriate state
//
for (EventEntry<FlowId> eventEntry : flowIdEvents) {
FlowId flowId = eventEntry.eventData();
log.debug("Flow ID Event: {} {}", eventEntry.eventType(), flowId);
- FlowPath flowPath = allFlowPaths.get(flowId.value());
- if (flowPath == null) {
- if (! topologyEvents.isEmpty())
- continue; // Optimization: Not my flow
- Dpid dpid = FlowDatabaseOperation.getFlowSourceDpid(dbHandler,
- flowId);
- if ((dpid != null) && (mySwitches.get(dpid.value()) != null)) {
- flowPath = FlowDatabaseOperation.getFlow(dbHandler,
- flowId);
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD: {
+ //
+ // Add a new Flow Path
+ //
+ if (allFlowPaths.get(flowId.value()) != null) {
+ //
+ // TODO: What to do if the Flow Path already exists?
+ // Fow now, we just re-add it.
+ //
}
+ shouldFetchMyFlowIds.add(flowId);
+
+ break;
}
- if (flowPath != null) {
+
+ case ENTRY_REMOVE: {
+ //
+ // Remove an existing Flow Path.
+ //
+ // Find the Flow Path, and mark the Flow and its Flow Entries
+ // for deletion.
+ //
+ FlowPath existingFlowPath =
+ allFlowPaths.get(flowId.value());
+ if (existingFlowPath == null)
+ continue; // Nothing to do
+
+ existingFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_DELETE);
+ for (FlowEntry flowEntry : existingFlowPath.flowEntries()) {
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+
+ // Remove the Flow Path from the internal state
+ Long key = existingFlowPath.flowId().value();
+ allFlowPaths.remove(key);
+ shouldRecomputeFlowPaths.remove(key);
+ modifiedFlowPaths.put(key, existingFlowPath);
+
+ break;
+ }
+ }
+ }
+
+ // Get my Flows
+ Collection<FlowPath> myFlows =
+ FlowDatabaseOperation.getFlows(dbHandler, shouldFetchMyFlowIds);
+
+ for (FlowPath flowPath : myFlows) {
+ fixFlowFromDatabase(flowPath);
+
+ switch (flowPath.flowPathType()) {
+ case FP_TYPE_SHORTEST_PATH:
+ //
+ // Reset the Data Path, in case it was set already, because
+ // we are going to recompute it anyway.
+ //
+ flowPath.flowEntries().clear();
shouldRecomputeFlowPaths.put(flowPath.flowId().value(),
flowPath);
+ break;
+ case FP_TYPE_EXPLICIT_PATH:
+ //
+ // Mark all Flow Entries for installation in the switches.
+ //
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ break;
+ case FP_TYPE_UNKNOWN:
+ log.error("FlowPath event with unknown type");
+ break;
}
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
}
}
@@ -674,33 +718,6 @@
}
log.debug("[BEFORE] {}", topology.toString());
-
- //
- // TODO: Fake the unconditional topology read by checking the cache
- // with the old topology and ignoring topology events that don't
- // make any impact to the topology.
- // This is needed aa workaround: if a port is down, we get
- // up to three additional "Port Down" or "Link Down" events.
- //
- for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
- TopologyElement topologyElement = eventEntry.eventData();
-
- switch (eventEntry.eventType()) {
- case ENTRY_ADD:
- isTopologyModified |= topology.addTopologyElement(topologyElement);
- break;
- case ENTRY_REMOVE:
- isTopologyModified |= topology.removeTopologyElement(topologyElement);
- break;
- }
- if (isTopologyModified)
- break;
- }
- if (! isTopologyModified) {
- log.debug("Ignoring topology events that don't modify the topology");
- return;
- }
-
topology.readFromDatabase(dbHandler);
log.debug("[AFTER] {}", topology.toString());
shouldRecomputeFlowPaths.putAll(allFlowPaths);