Finish the implementation of the new logic for computing and installing
the Flows.
NOTE: The code is untested and disabled by default.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 9979664..9d37a63 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -240,12 +240,61 @@
private void processEvents() {
Collection<FlowEntry> modifiedFlowEntries;
+ if (enableOnrc2014MeasurementsFlows) {
+
+ if (topologyEvents.isEmpty() && flowIdEvents.isEmpty()) {
+ return; // Nothing to do
+ }
+
+ Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+ // Fetch and prepare my flows
+ prepareMyFlows(mySwitches);
+
+ // Fetch the topology
+ processTopologyEvents();
+
+ // Recompute all affected Flow Paths and keep only the modified
+ for (FlowPath flowPath : shouldRecomputeFlowPaths.values()) {
+ if (recomputeFlowPath(flowPath))
+ modifiedFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+
+ // Assign the Flow Entry ID as needed
+ for (FlowPath flowPath : modifiedFlowPaths.values()) {
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (! flowEntry.isValidFlowEntryId()) {
+ long id = flowManager.getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+ }
+ }
+
+ // Extract my modified Flow Entries
+ modifiedFlowEntries = processFlowIdEvents(mySwitches);
+
+ //
+ // Push the modified state to the Flow Manager
+ //
+ flowManager.pushModifiedFlowState(modifiedFlowPaths.values(),
+ modifiedFlowEntries);
+
+ // Cleanup
+ topologyEvents.clear();
+ flowIdEvents.clear();
+ //
+ allFlowPaths.clear();
+ shouldRecomputeFlowPaths.clear();
+ modifiedFlowPaths.clear();
+
+ return;
+ }
+
if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
- flowEntryEvents.isEmpty() && flowIdEvents.isEmpty()) {
+ flowEntryEvents.isEmpty()) {
return; // Nothing to do
}
- processFlowIdEvents();
processFlowPathEvents();
processTopologyEvents();
processUnmatchedFlowEntryAdd();
@@ -285,7 +334,6 @@
topologyEvents.clear();
flowPathEvents.clear();
flowEntryEvents.clear();
- flowIdEvents.clear();
//
shouldRecomputeFlowPaths.clear();
modifiedFlowPaths.clear();
@@ -378,9 +426,27 @@
}
/**
- * Process the Flow ID events.
+ * Prepare my flows.
+ *
+ * @param mySwitches the collection of my switches.
*/
- private void processFlowIdEvents() {
+ private void prepareMyFlows(Map<Long, IOFSwitch> mySwitches) {
+ ArrayList<FlowPath> myFlows = flowManager.getAllMyFlows(mySwitches);
+ for (FlowPath flowPath : myFlows) {
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+ }
+
+ /**
+ * Process the Flow ID events.
+ *
+ * @param mySwitches the collection of my switches.
+ * @return a collection of modified Flow Entries this instance needs
+ * to push to its own switches.
+ */
+ private Collection<FlowEntry> processFlowIdEvents(Map<Long, IOFSwitch> mySwitches) {
+ List<FlowEntry> modifiedFlowEntries = new LinkedList<FlowEntry>();
+
//
// Process all Flow ID events and update the appropriate state
//
@@ -389,28 +455,34 @@
log.debug("Flow ID Event: {} {}", eventEntry.eventType(), flowId);
- switch (eventEntry.eventType()) {
- case ENTRY_ADD: {
- //
- // Add a new Flow ID
- //
- // TODO: Implement it!
-
- break;
+ //
+ // Lookup the Flow ID in the Flows that were read from the
+ // database in in a previous step. If not found, read from
+ // the database.
+ //
+ FlowPath flowPath = allFlowPaths.get(flowId.value());
+ if (flowPath == null)
+ flowPath = flowManager.getFlow(flowId);
+ if (flowPath == null) {
+ log.debug("Flow ID {} : Flow not found!", flowId);
+ continue;
}
- case ENTRY_REMOVE: {
- //
- // Remove an existing Flow ID.
- //
- // TODO: Implement it!
-
- break;
- }
+ // Collect only my flow entries that are not updated.
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() !=
+ FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+ continue;
+ }
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+ if (mySwitch == null)
+ continue;
+ modifiedFlowEntries.add(flowEntry);
}
}
- }
+ return modifiedFlowEntries;
+ }
/**
* Process the Flow Path events.
@@ -498,31 +570,34 @@
* Process the Topology events.
*/
private void processTopologyEvents() {
+ if (enableOnrc2014MeasurementsTopology) {
+ if (topologyEvents.isEmpty())
+ return;
+ log.debug("[BEFORE] {}", topology.toString());
+ topology.readFromDatabase(dbHandler);
+ log.debug("[AFTER] {}", topology.toString());
+ shouldRecomputeFlowPaths.putAll(allFlowPaths);
+ return;
+ }
+
//
// Process all Topology events and update the appropriate state
//
boolean isTopologyModified = false;
- if (enableOnrc2014MeasurementsTopology) {
- log.debug("[BEFORE] {}", topology.toString());
- topology.readFromDatabase(dbHandler);
- isTopologyModified = true;
- log.debug("[AFTER] {}", topology.toString());
- } else {
- for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
- TopologyElement topologyElement = eventEntry.eventData();
+ for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
+ TopologyElement topologyElement = eventEntry.eventData();
- log.debug("Topology Event: {} {}", eventEntry.eventType(),
- topologyElement.toString());
+ log.debug("Topology Event: {} {}", eventEntry.eventType(),
+ topologyElement.toString());
- switch (eventEntry.eventType()) {
- case ENTRY_ADD:
- isTopologyModified |= topology.addTopologyElement(topologyElement);
- break;
- case ENTRY_REMOVE:
- isTopologyModified |= topology.removeTopologyElement(topologyElement);
- break;
- }
- }
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD:
+ isTopologyModified |= topology.addTopologyElement(topologyElement);
+ break;
+ case ENTRY_REMOVE:
+ isTopologyModified |= topology.removeTopologyElement(topologyElement);
+ break;
+ }
}
if (isTopologyModified) {
// TODO: For now, if the topology changes, we recompute all Flows