* Transmit and process the FlowEntry events
* Explicitly set the FlowId for the Flow Entries, because it is needed/used
to match a FlowEntry to the corresponding FlowPath.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index 08f57ed..29deb94 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -21,9 +21,11 @@
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
import net.onrc.onos.ofcontroller.util.FlowEntryActions;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
+import net.onrc.onos.ofcontroller.util.FlowId;
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.FlowPathUserState;
@@ -41,6 +43,8 @@
private IDatagridService datagridService; // The Datagrid Service to use
private Topology topology; // The network topology
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
+ private List<FlowEntry> unmatchedFlowEntryUpdates =
+ new LinkedList<FlowEntry>();
// The queue with Flow Path and Topology Element updates
private BlockingQueue<EventEntry<?>> networkEvents =
@@ -153,9 +157,6 @@
List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
- // TODO: For now we don't use/process the FlowEntry events
- flowEntryEvents.clear();
-
if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
flowEntryEvents.isEmpty()) {
return; // Nothing to do
@@ -263,6 +264,43 @@
}
//
+ // Process previously unmatched Flow Entry updates
+ //
+ if ((! flowPathEvents.isEmpty()) && (! unmatchedFlowEntryUpdates.isEmpty())) {
+ List<FlowEntry> remainingUpdates = new LinkedList<FlowEntry>();
+ for (FlowEntry flowEntry : unmatchedFlowEntryUpdates) {
+ if (! updateFlowEntry(flowEntry))
+ remainingUpdates.add(flowEntry);
+ }
+ unmatchedFlowEntryUpdates = remainingUpdates;
+ }
+
+ //
+ // Process the Flow Entry events
+ //
+ for (EventEntry<FlowEntry> eventEntry : flowEntryEvents) {
+ FlowEntry flowEntry = eventEntry.eventData();
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD:
+ //
+ // Find the corresponding Flow Entry and update it.
+ // If not found, then keep it in a local cache for
+ // later matching.
+ //
+ if (! updateFlowEntry(flowEntry))
+ unmatchedFlowEntryUpdates.add(flowEntry);
+ break;
+ case ENTRY_REMOVE:
+ //
+ // NOTE: For now we remove the Flow Entries based on
+ // local decisions, so no need to remove them because of an
+ // external event.
+ //
+ break;
+ }
+ }
+
+ //
// Push the Flow Entries that have been modified
//
flowManager.pushModifiedFlowEntries(modifiedFlowPaths);
@@ -270,6 +308,69 @@
// Cleanup
topologyEvents.clear();
flowPathEvents.clear();
+ flowEntryEvents.clear();
+ }
+
+ /**
+ * Update a Flow Entry because of an external event.
+ *
+ * @param flowEntry the FlowEntry with the new state.
+ * @return true if the Flow Entry was found and updated, otherwise false.
+ */
+ private boolean updateFlowEntry(FlowEntry flowEntry) {
+ if ((! flowEntry.isValidFlowId()) ||
+ (! flowEntry.isValidFlowEntryId())) {
+ //
+ // Ignore events for Flow Entries with invalid Flow ID or
+ // Flow Entry ID.
+ // This shouldn't happen.
+ //
+ return true;
+ }
+
+ FlowPath flowPath = allFlowPaths.get(flowEntry.flowId().value());
+ if (flowPath == null)
+ return false;
+
+ //
+ // Iterate over all Flow Entries and find a match based on the DPID
+ //
+ for (FlowEntry localFlowEntry : flowPath.flowEntries()) {
+ if (localFlowEntry.dpid().value() != flowEntry.dpid().value())
+ continue;
+ //
+ // TODO: We might want to check the FlowEntryMatch and
+ // FlowEntryActions to double-check it is the same Flow Entry
+ //
+
+ //
+ // Local Flow Entry match found
+ //
+ if (localFlowEntry.isValidFlowEntryId()) {
+ if (localFlowEntry.flowEntryId().value() !=
+ flowEntry.flowEntryId().value()) {
+ //
+ // Find a local Flow Entry, but the Flow Entry ID doesn't
+ // match. Ignore the event.
+ //
+ return true;
+ }
+ } else {
+ // Update the Flow Entry ID
+ FlowEntryId flowEntryId =
+ new FlowEntryId(flowEntry.flowEntryId().value());
+ localFlowEntry.setFlowEntryId(flowEntryId);
+ }
+
+ //
+ // Update the local Flow Entry.
+ // For now we update only the Flow Entry Switch State
+ //
+ localFlowEntry.setFlowEntrySwitchState(flowEntry.flowEntrySwitchState());
+ return true;
+ }
+
+ return false; // Entry not found
}
/**
@@ -395,6 +496,12 @@
//
// Add the new Flow Entry
//
+ //
+ // NOTE: Assign only the Flow ID.
+ // The Flow Entry ID is assigned later only for the Flow Entries
+ // this instance is responsible for.
+ //
+ newFlowEntry.setFlowId(new FlowId(flowPath.flowId().value()));
// Set the incoming port matching
FlowEntryMatch flowEntryMatch = new FlowEntryMatch();
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index c14772d..4465835 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -519,7 +519,7 @@
@Override
public boolean addFlow(FlowPath flowPath, FlowId flowId) {
//
- // NOTE: We need to explicitly initialize the Flow Entry Switch State,
+ // NOTE: We need to explicitly initialize some of the state,
// in case the application didn't do it.
//
for (FlowEntry flowEntry : flowPath.flowEntries()) {
@@ -527,6 +527,8 @@
FlowEntrySwitchState.FE_SWITCH_UNKNOWN) {
flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
}
+ if (! flowEntry.isValidFlowId())
+ flowEntry.setFlowId(new FlowId(flowPath.flowId().value()));
}
if (FlowDatabaseOperation.addFlow(this, dbHandler, flowPath, flowId)) {
@@ -707,6 +709,7 @@
* @return true on success, otherwise false.
*/
private boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
+ String flowIdStr = flowObj.getFlowId();
//
// Set the incoming port matching and the outgoing port output
@@ -714,6 +717,8 @@
//
int idx = 0;
for (FlowEntry flowEntry : newDataPath.flowEntries()) {
+ flowEntry.setFlowId(new FlowId(flowIdStr));
+
// Mark the Flow Entry as not updated in the switch
flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
// Set the incoming port matching
@@ -864,8 +869,7 @@
//
// Assign the FlowEntry ID if needed
//
- if ((flowEntry.flowEntryId() == null) ||
- (flowEntry.flowEntryId().value() == 0)) {
+ if (! flowEntry.isValidFlowEntryId()) {
long id = getNextFlowEntryId();
flowEntry.setFlowEntryId(new FlowEntryId(id));
}
@@ -887,6 +891,20 @@
// updated.
//
flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+ //
+ // Write the Flow Entry to the Datagrid
+ //
+ switch (flowEntry.flowEntryUserState()) {
+ case FE_USER_ADD:
+ datagridService.notificationSendFlowEntryAdded(flowEntry);
+ break;
+ case FE_USER_MODIFY:
+ datagridService.notificationSendFlowEntryUpdated(flowEntry);
+ break;
+ case FE_USER_DELETE:
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ break;
+ }
//
// Write the Flow Entry to the Network Map