Merge branch 'master' of https://github.com/OPENNETWORKINGLAB/ONOS
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index d04e50a..d9fb7c3 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -82,7 +82,7 @@
*
* The datagrid map is:
* - Key : Flow ID (Long)
- * - Value : Serialized Flow (byte[])
+ * - Value : Serialized FlowPath (byte[])
*/
class MapFlowListener implements EntryListener<Long, byte[]> {
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
index cb1e678..0e9887a 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -10,6 +10,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import net.floodlightcontroller.core.IOFSwitch;
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
@@ -31,6 +32,19 @@
import org.slf4j.LoggerFactory;
/**
+ * A class for storing a pair of Flow Path and a Flow Entry.
+ */
+class FlowPathEntryPair {
+ protected FlowPath flowPath;
+ protected FlowEntry flowEntry;
+
+ protected FlowPathEntryPair(FlowPath flowPath, FlowEntry flowEntry) {
+ this.flowPath = flowPath;
+ this.flowEntry = flowEntry;
+ }
+}
+
+/**
* Class for FlowPath Maintenance.
* This class listens for FlowEvents to:
* - Maintain a local cache of the Network Topology.
@@ -45,8 +59,8 @@
private IDatagridService datagridService; // The Datagrid Service to use
private Topology topology; // The network topology
private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
- private List<FlowEntry> unmatchedFlowEntryUpdates =
- new LinkedList<FlowEntry>();
+ private Map<Long, FlowEntry> unmatchedFlowEntryAdd =
+ new HashMap<Long, FlowEntry>();
// The queue with Flow Path and Topology Element updates
private BlockingQueue<EventEntry<?>> networkEvents =
@@ -60,6 +74,22 @@
private List<EventEntry<FlowEntry>> flowEntryEvents =
new LinkedList<EventEntry<FlowEntry>>();
+ //
+ // Transient state for processing the Flow Paths:
+ // - The new Flow Paths
+ // - The Flow Paths that should be recomputed
+ // - The Flow Paths with modified Flow Entries
+ // - The Flow Entries that were updated
+ //
+ private List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
+ private List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
+ private List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
+ private List<FlowPathEntryPair> updatedFlowEntries =
+ new LinkedList<FlowPathEntryPair>();
+ private List<FlowPathEntryPair> unmatchedDeleteFlowEntries =
+ new LinkedList<FlowPathEntryPair>();
+
+
/**
* Constructor for a given Flow Manager and Datagrid Service.
*
@@ -81,10 +111,9 @@
protected Topology getTopology() { return this.topology; }
/**
- * Run the thread.
+ * Startup processing.
*/
- @Override
- public void run() {
+ private void startup() {
//
// Obtain the initial Topology state
//
@@ -116,6 +145,14 @@
// Process the initial events (if any)
processEvents();
+ }
+
+ /**
+ * Run the thread.
+ */
+ @Override
+ public void run() {
+ startup();
//
// The main loop
@@ -162,17 +199,116 @@
* Process the events (if any)
*/
private void processEvents() {
- List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
- List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
- List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
+ List<FlowPathEntryPair> modifiedFlowEntries;
if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
flowEntryEvents.isEmpty()) {
return; // Nothing to do
}
+ processFlowPathEvents();
+ processTopologyEvents();
//
- // Process the Flow Path events
+ // Add all new Flows: should be done after processing the Flow Path
+ // and Topology events.
+ //
+ for (FlowPath flowPath : newFlowPaths) {
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+
+ processFlowEntryEvents();
+
+ // Recompute all affected Flow Paths and keep only the modified
+ for (FlowPath flowPath : recomputeFlowPaths) {
+ if (recomputeFlowPath(flowPath))
+ modifiedFlowPaths.add(flowPath);
+ }
+
+ modifiedFlowEntries = extractModifiedFlowEntries(modifiedFlowPaths);
+
+ // Assign missing Flow Entry IDs
+ assignFlowEntryId(modifiedFlowEntries);
+
+ //
+ // Push the modified Flow Entries to switches, datagrid and database
+ //
+ flowManager.pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
+ flowManager.pushModifiedFlowEntriesToDatagrid(modifiedFlowEntries);
+ flowManager.pushModifiedFlowEntriesToDatabase(modifiedFlowEntries);
+ flowManager.pushModifiedFlowEntriesToDatabase(updatedFlowEntries);
+ flowManager.pushModifiedFlowEntriesToDatabase(unmatchedDeleteFlowEntries);
+
+ //
+ // Remove Flow Entries that were deleted
+ //
+ for (FlowPath flowPath : modifiedFlowPaths)
+ flowPath.dataPath().removeDeletedFlowEntries();
+
+ // Cleanup
+ topologyEvents.clear();
+ flowPathEvents.clear();
+ flowEntryEvents.clear();
+ //
+ newFlowPaths.clear();
+ recomputeFlowPaths.clear();
+ modifiedFlowPaths.clear();
+ updatedFlowEntries.clear();
+ unmatchedDeleteFlowEntries.clear();
+ }
+
+ /**
+ * Extract the modified Flow Entries.
+ */
+ private List<FlowPathEntryPair> extractModifiedFlowEntries(
+ List<FlowPath> modifiedFlowPaths) {
+ List<FlowPathEntryPair> modifiedFlowEntries =
+ new LinkedList<FlowPathEntryPair>();
+
+ // Extract only the modified Flow Entries
+ for (FlowPath flowPath : modifiedFlowPaths) {
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() ==
+ FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+ FlowPathEntryPair flowPair =
+ new FlowPathEntryPair(flowPath, flowEntry);
+ modifiedFlowEntries.add(flowPair);
+ }
+ }
+ }
+ return modifiedFlowEntries;
+ }
+
+ /**
+ * Assign the Flow Entry ID as needed.
+ */
+ private void assignFlowEntryId(List<FlowPathEntryPair> modifiedFlowEntries) {
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+ //
+ // Assign the Flow Entry ID only for Flow Entries for my switches
+ //
+ for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+ FlowEntry flowEntry = flowPair.flowEntry;
+ // Update the Flow Entries only for my switches
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+ if (mySwitch == null)
+ continue;
+ if (! flowEntry.isValidFlowEntryId()) {
+ long id = flowManager.getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+ }
+ }
+
+ /**
+ * Process the Flow Path events.
+ */
+ private void processFlowPathEvents() {
+ //
+ // Process all Flow Path events and update the appropriate state
//
for (EventEntry<FlowPath> eventEntry : flowPathEvents) {
FlowPath flowPath = eventEntry.eventData();
@@ -243,9 +379,14 @@
}
}
}
+ }
+ /**
+ * Process the Topology events.
+ */
+ private void processTopologyEvents() {
//
- // Process the topology events
+ // Process all Topology events and update the appropriate state
//
boolean isTopologyModified = false;
for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
@@ -267,97 +408,114 @@
// TODO: For now, if the topology changes, we recompute all Flows
recomputeFlowPaths.addAll(allFlowPaths.values());
}
-
- // Add all new Flows
- for (FlowPath flowPath : newFlowPaths) {
- allFlowPaths.put(flowPath.flowId().value(), flowPath);
- }
-
- // Recompute all affected Flow Paths and keep only the modified
- for (FlowPath flowPath : recomputeFlowPaths) {
- if (recomputeFlowPath(flowPath))
- modifiedFlowPaths.add(flowPath);
- }
-
- //
- // Process previously unmatched Flow Entry updates
- //
- if ((! flowPathEvents.isEmpty()) && (! unmatchedFlowEntryUpdates.isEmpty())) {
- List<FlowEntry> remainingUpdates = new LinkedList<FlowEntry>();
- for (FlowEntry flowEntry : unmatchedFlowEntryUpdates) {
- if (! updateFlowEntry(flowEntry))
- remainingUpdates.add(flowEntry);
- }
- unmatchedFlowEntryUpdates = remainingUpdates;
- }
-
- //
- // Process the Flow Entry events
- //
- for (EventEntry<FlowEntry> eventEntry : flowEntryEvents) {
- FlowEntry flowEntry = eventEntry.eventData();
- switch (eventEntry.eventType()) {
- case ENTRY_ADD:
- //
- // Find the corresponding Flow Entry and update it.
- // If not found, then keep it in a local cache for
- // later matching.
- //
- if (! updateFlowEntry(flowEntry))
- unmatchedFlowEntryUpdates.add(flowEntry);
- break;
- case ENTRY_REMOVE:
- //
- // NOTE: For now we remove the Flow Entries based on
- // local decisions, so no need to remove them because of an
- // external event.
- //
- break;
- }
- }
-
- //
- // Push the Flow Entries that have been modified
- //
- flowManager.pushModifiedFlowEntries(modifiedFlowPaths);
-
- // Cleanup
- topologyEvents.clear();
- flowPathEvents.clear();
- flowEntryEvents.clear();
}
/**
- * Update a Flow Entry because of an external event.
- *
- * @param flowEntry the FlowEntry with the new state.
- * @return true if the Flow Entry was found and updated, otherwise false.
+ * Process the Flow Entry events.
*/
- private boolean updateFlowEntry(FlowEntry flowEntry) {
- if ((! flowEntry.isValidFlowId()) ||
- (! flowEntry.isValidFlowEntryId())) {
- //
- // Ignore events for Flow Entries with invalid Flow ID or
- // Flow Entry ID.
- // This shouldn't happen.
- //
- return true;
+ private void processFlowEntryEvents() {
+ FlowPathEntryPair flowPair;
+ FlowPath flowPath;
+ FlowEntry updatedFlowEntry;
+
+ //
+ // Update Flow Entries with previously unmatched Flow Entry updates
+ //
+ if (! unmatchedFlowEntryAdd.isEmpty()) {
+ Map<Long, FlowEntry> remainingUpdates = new HashMap<Long, FlowEntry>();
+ for (FlowEntry flowEntry : unmatchedFlowEntryAdd.values()) {
+ flowPath = allFlowPaths.get(flowEntry.flowId().value());
+ if (flowPath == null)
+ continue;
+ updatedFlowEntry = updateFlowEntryAdd(flowPath, flowEntry);
+ if (updatedFlowEntry == null) {
+ remainingUpdates.put(flowEntry.flowEntryId().value(),
+ flowEntry);
+ continue;
+ }
+ flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+ updatedFlowEntries.add(flowPair);
+ }
+ unmatchedFlowEntryAdd = remainingUpdates;
}
- FlowPath flowPath = allFlowPaths.get(flowEntry.flowId().value());
- if (flowPath == null)
- return false;
-
//
- // Iterate over all Flow Entries and find a match based on the DPID
+ // Process all Flow Entry events and update the appropriate state
+ //
+ for (EventEntry<FlowEntry> eventEntry : flowEntryEvents) {
+ FlowEntry flowEntry = eventEntry.eventData();
+
+ log.debug("Flow Entry Event: {} {}", eventEntry.eventType(),
+ flowEntry.toString());
+
+ if ((! flowEntry.isValidFlowId()) ||
+ (! flowEntry.isValidFlowEntryId())) {
+ continue;
+ }
+
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD:
+ flowPath = allFlowPaths.get(flowEntry.flowId().value());
+ if (flowPath == null) {
+ // Flow Path not found: keep the entry for later matching
+ unmatchedFlowEntryAdd.put(flowEntry.flowEntryId().value(),
+ flowEntry);
+ break;
+ }
+ updatedFlowEntry = updateFlowEntryAdd(flowPath, flowEntry);
+ if (updatedFlowEntry == null) {
+ // Flow Entry not found: keep the entry for later matching
+ unmatchedFlowEntryAdd.put(flowEntry.flowEntryId().value(),
+ flowEntry);
+ break;
+ }
+ // Add the updated entry to the list of updated Flow Entries
+ flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+ updatedFlowEntries.add(flowPair);
+ break;
+
+ case ENTRY_REMOVE:
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+ if (unmatchedFlowEntryAdd.remove(flowEntry.flowEntryId().value()) != null) {
+ continue; // Match found
+ }
+
+ flowPath = allFlowPaths.get(flowEntry.flowId().value());
+ if (flowPath == null) {
+ // Flow Path not found: ignore the update
+ break;
+ }
+ updatedFlowEntry = updateFlowEntryRemove(flowPath, flowEntry);
+ if (updatedFlowEntry == null) {
+ // Flow Entry not found: add to list of deleted entries
+ flowPair = new FlowPathEntryPair(flowPath, flowEntry);
+ unmatchedDeleteFlowEntries.add(flowPair);
+ break;
+ }
+ flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+ updatedFlowEntries.add(flowPair);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Update a Flow Entry because of an external ENTRY_ADD event.
+ *
+ * @param flowPath the FlowPath for the Flow Entry to update.
+ * @param flowEntry the FlowEntry with the new state.
+ * @return the updated Flow Entry if found, otherwise null.
+ */
+ private FlowEntry updateFlowEntryAdd(FlowPath flowPath,
+ FlowEntry flowEntry) {
+ //
+ // Iterate over all Flow Entries and find a match.
//
for (FlowEntry localFlowEntry : flowPath.flowEntries()) {
- if (localFlowEntry.dpid().value() != flowEntry.dpid().value())
+ if (! TopologyManager.isSameFlowEntryDataPath(localFlowEntry,
+ flowEntry)) {
continue;
- //
- // TODO: We might want to check the FlowEntryMatch and
- // FlowEntryActions to double-check it is the same Flow Entry
- //
+ }
//
// Local Flow Entry match found
@@ -367,9 +525,9 @@
flowEntry.flowEntryId().value()) {
//
// Find a local Flow Entry, but the Flow Entry ID doesn't
- // match. Ignore the event.
+ // match. Keep looking.
//
- return true;
+ continue;
}
} else {
// Update the Flow Entry ID
@@ -380,13 +538,44 @@
//
// Update the local Flow Entry.
- // For now we update only the Flow Entry Switch State
//
+ localFlowEntry.setFlowEntryUserState(flowEntry.flowEntryUserState());
localFlowEntry.setFlowEntrySwitchState(flowEntry.flowEntrySwitchState());
- return true;
+ return localFlowEntry;
}
- return false; // Entry not found
+ return null; // Entry not found
+ }
+
+ /**
+ * Update a Flow Entry because of an external ENTRY_REMOVE event.
+ *
+ * @param flowPath the FlowPath for the Flow Entry to update.
+ * @param flowEntry the FlowEntry with the new state.
+ * @return the updated Flow Entry if found, otherwise null.
+ */
+ private FlowEntry updateFlowEntryRemove(FlowPath flowPath,
+ FlowEntry flowEntry) {
+ //
+ // Iterate over all Flow Entries and find a match based on
+ // the Flow Entry ID.
+ //
+ for (FlowEntry localFlowEntry : flowPath.flowEntries()) {
+ if (! localFlowEntry.isValidFlowEntryId())
+ continue;
+ if (localFlowEntry.flowEntryId().value() !=
+ flowEntry.flowEntryId().value()) {
+ continue;
+ }
+ //
+ // Update the local Flow Entry.
+ //
+ localFlowEntry.setFlowEntryUserState(flowEntry.flowEntryUserState());
+ localFlowEntry.setFlowEntrySwitchState(flowEntry.flowEntrySwitchState());
+ return localFlowEntry;
+ }
+
+ return null; // Entry not found
}
/**
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 98f9cce..e239ae9 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -41,6 +41,12 @@
*/
public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
+ //
+ // TODO: A temporary variable to switch between the poll-based and
+ // notification mechanism for the Flow Manager.
+ //
+ private final static boolean enableNotifications = false;
+
protected GraphDBOperation dbHandlerApi;
protected GraphDBOperation dbHandlerInner;
@@ -506,10 +512,12 @@
// Schedule the threads and periodic tasks
flowEventHandler.start();
- mapReaderScheduler.scheduleAtFixedRate(
+ if (! enableNotifications) {
+ mapReaderScheduler.scheduleAtFixedRate(
mapReader, 3, 3, TimeUnit.SECONDS);
- shortestPathReconcileScheduler.scheduleAtFixedRate(
+ shortestPathReconcileScheduler.scheduleAtFixedRate(
shortestPathReconcile, 3, 3, TimeUnit.SECONDS);
+ }
}
/**
@@ -718,6 +726,15 @@
}
/**
+ * Get the collection of my switches.
+ *
+ * @return the collection of my switches.
+ */
+ public Map<Long, IOFSwitch> getMySwitches() {
+ return floodlightProvider.getSwitches();
+ }
+
+ /**
* Get the network topology.
*
* @return the network topology.
@@ -855,126 +872,179 @@
}
/**
- * Push the modified Flow Entries of a collection of Flow Paths.
- * Only the Flow Entries to switches controlled by this instance
+ * Push modified Flow Entries to switches.
+ *
+ * NOTE: Only the Flow Entries to switches controlled by this instance
* are pushed.
*
- * NOTE: Currently, we write to both the Network MAP and the switches.
- *
- * @param modifiedFlowPaths the collection of Flow Paths with the modified
- * Flow Entries.
+ * @param modifiedFlowEntries the collection of modified Flow Entries.
*/
- public void pushModifiedFlowEntries(Collection<FlowPath> modifiedFlowPaths) {
+ public void pushModifiedFlowEntriesToSwitches(
+ Collection<FlowPathEntryPair> modifiedFlowEntries) {
// TODO: For now, the pushing of Flow Entries is disabled
- if (true)
+ if (! enableNotifications)
return;
- if (modifiedFlowPaths.isEmpty())
+ if (modifiedFlowEntries.isEmpty())
return;
- Map<Long, IOFSwitch> mySwitches = floodlightProvider.getSwitches();
+ Map<Long, IOFSwitch> mySwitches = getMySwitches();
- for (FlowPath flowPath : modifiedFlowPaths) {
+ for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+ FlowPath flowPath = flowPair.flowPath;
+ FlowEntry flowEntry = flowPair.flowEntry;
+
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+ if (mySwitch == null)
+ continue;
+
+ log.debug("Pushing Flow Entry To Switch: {}", flowEntry.toString());
+
//
- // Find the Flow Path in the Network MAP.
- // NOTE: The Flow Path might not be found if the Flow was just
- // removed by some other controller instance.
+ // Install the Flow Entry into the switch
//
- IFlowPath flowObj = dbHandlerInner.searchFlowPath(flowPath.flowId());
+ if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
+ String logMsg = "Cannot install Flow Entry " +
+ flowEntry.flowEntryId() +
+ " from Flow Path " + flowPath.flowId() +
+ " on switch " + flowEntry.dpid();
+ log.error(logMsg);
+ continue;
+ }
- boolean isFlowEntryDeleted = false;
- for (FlowEntry flowEntry : flowPath.flowEntries()) {
- log.debug("Updating Flow Entry: {}", flowEntry.toString());
+ //
+ // NOTE: Here we assume that the switch has been
+ // successfully updated.
+ //
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+ }
+ }
- if (flowEntry.flowEntrySwitchState() !=
- FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
- continue; // No need to update the entry
- }
- if (flowEntry.flowEntryUserState() ==
+ /**
+ * Push modified Flow Entries to the datagrid.
+ *
+ * @param modifiedFlowEntries the collection of modified Flow Entries.
+ */
+ public void pushModifiedFlowEntriesToDatagrid(
+ Collection<FlowPathEntryPair> modifiedFlowEntries) {
+ // TODO: For now, the pushing of Flow Entries is disabled
+ if (! enableNotifications)
+ return;
+
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+ for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+ FlowEntry flowEntry = flowPair.flowEntry;
+
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+
+ //
+ // TODO: For now Flow Entries are removed by all instances,
+ // even if this Flow Entry is not for our switches.
+ //
+ // This is needed to handle the case a switch going down:
+ // it has no Master controller instance, hence no
+ // controller instance will cleanup its flow entries.
+ // This is sub-optimal: we need to elect a controller
+ // instance to handle the cleanup of such orphaned flow
+ // entries.
+ //
+ if (mySwitch == null) {
+ if (flowEntry.flowEntryUserState() !=
FlowEntryUserState.FE_USER_DELETE) {
- isFlowEntryDeleted = true;
- }
-
- //
- // Install the Flow Entries into my switches
- //
- IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
- if (mySwitch != null) {
- //
- // Assign the FlowEntry ID if needed
- //
- if (! flowEntry.isValidFlowEntryId()) {
- long id = getNextFlowEntryId();
- flowEntry.setFlowEntryId(new FlowEntryId(id));
- }
-
- //
- // Install the Flow Entry into the switch
- //
- if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
- String logMsg = "Cannot install Flow Entry " +
- flowEntry.flowEntryId() +
- " from Flow Path " + flowPath.flowId() +
- " on switch " + flowEntry.dpid();
- log.error(logMsg);
- continue;
- }
-
- //
- // NOTE: Here we assume that the switch has been
- // successfully updated.
- //
- flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
- }
-
- //
- // TODO: For now Flow Entries are removed from the Datagrid
- // and from the Network Map by all instances, even if this
- // Flow Entry is not for our switches.
- //
- // This is needed to handle the case a switch going down:
- // it has no Master controller instance, hence no
- // controller instance will cleanup its flow entries.
- // This is sub-optimal: we need to elect a controller
- // instance to handle the cleanup of such orphaned flow
- // entries.
- //
- if (mySwitch == null) {
- if (flowEntry.flowEntryUserState() !=
- FlowEntryUserState.FE_USER_DELETE) {
- continue;
- }
- if (! flowEntry.isValidFlowEntryId())
- continue;
- }
-
- //
- // Write the Flow Entry to the Datagrid
- //
- switch (flowEntry.flowEntryUserState()) {
- case FE_USER_ADD:
- if (mySwitch == null)
- break; // Install only flow entries for my switches
- datagridService.notificationSendFlowEntryAdded(flowEntry);
- break;
- case FE_USER_MODIFY:
- if (mySwitch == null)
- break; // Install only flow entries for my switches
- datagridService.notificationSendFlowEntryUpdated(flowEntry);
- break;
- case FE_USER_DELETE:
- datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
- break;
- }
-
- //
- // Write the Flow Entry to the Network Map
- //
- if (flowObj == null) {
- String logMsg = "Cannot find Network MAP entry for Flow Path " + flowPath.flowId();
continue;
}
+ if (! flowEntry.isValidFlowEntryId())
+ continue;
+ }
+
+ log.debug("Pushing Flow Entry To Datagrid: {}", flowEntry.toString());
+ //
+ // Write the Flow Entry to the Datagrid
+ //
+ switch (flowEntry.flowEntryUserState()) {
+ case FE_USER_ADD:
+ if (mySwitch == null)
+ break; // Install only flow entries for my switches
+ datagridService.notificationSendFlowEntryAdded(flowEntry);
+ break;
+ case FE_USER_MODIFY:
+ if (mySwitch == null)
+ break; // Install only flow entries for my switches
+ datagridService.notificationSendFlowEntryUpdated(flowEntry);
+ break;
+ case FE_USER_DELETE:
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ break;
+ }
+ }
+ }
+
+ /**
+ * Push Flow Entries to the Network MAP.
+ *
+ * NOTE: The Flow Entries are pushed only on the instance responsible
+ * for the first switch. This is to avoid database errors when multiple
+ * instances are writing Flow Entries for the same Flow Path.
+ *
+ * @param modifiedFlowEntries the collection of Flow Entries to push.
+ */
+ public void pushModifiedFlowEntriesToDatabase(
+ Collection<FlowPathEntryPair> modifiedFlowEntries) {
+ // TODO: For now, the pushing of Flow Entries is disabled
+ if (! enableNotifications)
+ return;
+
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+ for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+ FlowPath flowPath = flowPair.flowPath;
+ FlowEntry flowEntry = flowPair.flowEntry;
+
+ if (! flowEntry.isValidFlowEntryId())
+ continue;
+
+ //
+ // Push the changes only on the instance responsible for the
+ // first switch.
+ //
+ Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+ IOFSwitch mySrcSwitch = mySwitches.get(srcDpid.value());
+ if (mySrcSwitch == null)
+ continue;
+
+ log.debug("Pushing Flow Entry To Database: {}", flowEntry.toString());
+ //
+ // Write the Flow Entry to the Network Map
+ //
+ // NOTE: We try a number of times, in case somehow some other
+ // instances are writing at the same time.
+ // Apparently, if other instances are writing at the same time
+ // this will trigger an error.
+ //
+ for (int i = 0; i < 6; i++) {
try {
+ //
+ // Find the Flow Path in the Network MAP.
+ //
+ // NOTE: The Flow Path might not be found if the Flow was
+ // just removed by some other controller instance.
+ //
+ IFlowPath flowObj =
+ dbHandlerInner.searchFlowPath(flowEntry.flowId());
+ if (flowObj == null) {
+ String logMsg = "Cannot find Network MAP entry for Flow Path " + flowEntry.flowId();
+ log.error(logMsg);
+ break;
+ }
+
+ // Write the Flow Entry
switch (flowEntry.flowEntryUserState()) {
case FE_USER_ADD:
// FALLTHROUGH
@@ -982,7 +1052,7 @@
if (addFlowEntry(flowObj, flowEntry) == null) {
String logMsg = "Cannot write to Network MAP Flow Entry " +
flowEntry.flowEntryId() +
- " from Flow Path " + flowPath.flowId() +
+ " from Flow Path " + flowEntry.flowId() +
" on switch " + flowEntry.dpid();
log.error(logMsg);
}
@@ -991,45 +1061,28 @@
if (deleteFlowEntry(flowObj, flowEntry) == false) {
String logMsg = "Cannot remove from Network MAP Flow Entry " +
flowEntry.flowEntryId() +
- " from Flow Path " + flowPath.flowId() +
+ " from Flow Path " + flowEntry.flowId() +
" on switch " + flowEntry.dpid();
log.error(logMsg);
}
break;
}
+
+ // Commit to the database
+ dbHandlerInner.commit();
+ break; // Success
+
} catch (Exception e) {
log.debug("Exception writing Flow Entry to Network MAP: ", e);
dbHandlerInner.rollback();
- continue;
- }
- }
-
- //
- // Remove Flow Entries that were deleted
- //
- // NOTE: We create a new ArrayList, and add only the Flow Entries
- // that are NOT FE_USER_DELETE.
- // This is sub-optimal: if it adds notable processing cost,
- // the Flow Entries container should be changed to LinkedList
- // or some other container that has O(1) cost of removing an entry.
- //
- if (isFlowEntryDeleted) {
- ArrayList<FlowEntry> newFlowEntries = new ArrayList<FlowEntry>();
- for (FlowEntry flowEntry : flowPath.flowEntries()) {
- if (flowEntry.flowEntryUserState() !=
- FlowEntryUserState.FE_USER_DELETE) {
- newFlowEntries.add(flowEntry);
+ // Wait a bit (random value [1ms, 20ms] and try again
+ int delay = 1 + randomGenerator.nextInt() % 20;
+ try {
+ Thread.sleep(delay);
+ } catch (Exception e0) {
}
}
- flowPath.dataPath().setFlowEntries(newFlowEntries);
}
}
- // Try to commit to the database
- try {
- dbHandlerInner.commit();
- } catch (Exception e) {
- log.debug("Exception during commit of Flow Entries to Network MAP", e);
- dbHandlerInner.rollback();
- }
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/util/DataPath.java b/src/main/java/net/onrc/onos/ofcontroller/util/DataPath.java
index 7c6597d..044cc6d 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/util/DataPath.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/util/DataPath.java
@@ -101,6 +101,41 @@
}
/**
+ * Remove Flow Entries that were deleted.
+ */
+ public void removeDeletedFlowEntries() {
+ //
+ // NOTE: We create a new ArrayList, and add only the Flow Entries
+ // that are NOT FE_USER_DELETE.
+ // This is sub-optimal: if it adds notable processing cost,
+ // the Flow Entries container should be changed to LinkedList
+ // or some other container that has O(1) cost of removing an entry.
+ //
+
+ // Test first whether any Flow Entry was deleted
+ boolean foundDeletedFlowEntry = false;
+ for (FlowEntry flowEntry : this.flowEntries) {
+ if (flowEntry.flowEntryUserState() ==
+ FlowEntryUserState.FE_USER_DELETE) {
+ foundDeletedFlowEntry = true;
+ break;
+ }
+ }
+ if (! foundDeletedFlowEntry)
+ return; // Nothing to do
+
+ // Create a new collection and exclude the deleted flow entries
+ ArrayList<FlowEntry> newFlowEntries = new ArrayList<FlowEntry>();
+ for (FlowEntry flowEntry : this.flowEntries()) {
+ if (flowEntry.flowEntryUserState() !=
+ FlowEntryUserState.FE_USER_DELETE) {
+ newFlowEntries.add(flowEntry);
+ }
+ }
+ setFlowEntries(newFlowEntries);
+ }
+
+ /**
* Get a string with the summary of the shortest-path data path
* computation.
*