Cleanup the FlowManager: remove unused code that implemented
the periodic polling mechanism.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index 7e46b54..b75b603 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -8,7 +8,6 @@
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -29,7 +28,6 @@
import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
import net.onrc.onos.ofcontroller.flowmanager.web.FlowWebRoutable;
import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
-import net.onrc.onos.ofcontroller.topology.ITopologyNetService;
import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.util.*;
@@ -41,13 +39,6 @@
* Flow Manager class for handling the network flows.
*/
public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
-
- //
- // TODO: A temporary variable to switch between the poll-based and
- // notification mechanism for the Flow Manager.
- //
- private final static boolean enableNotifications = true;
-
// flag to use FlowPusher instead of FlowSwitchOperation/MessageDamper
private final static boolean enableFlowPusher = false;
@@ -55,7 +46,6 @@
protected GraphDBOperation dbHandlerInner;
protected volatile IFloodlightProviderService floodlightProvider;
- protected volatile ITopologyNetService topologyNetService;
protected volatile IDatagridService datagridService;
protected IRestApiService restApi;
protected FloodlightModuleContext context;
@@ -76,7 +66,6 @@
private static Random randomGenerator = new Random();
private static int nextFlowEntryIdPrefix = 0;
private static int nextFlowEntryIdSuffix = 0;
- private static long nextFlowEntryId = 0;
/** The logger. */
private final static Logger log = LoggerFactory.getLogger(FlowManager.class);
@@ -86,305 +75,6 @@
new LinkedBlockingQueue<FlowPathEntryPair>();
FlowDatabaseWriter flowDatabaseWriter;
- // The periodic task(s)
- private ScheduledExecutorService mapReaderScheduler;
- private ScheduledExecutorService shortestPathReconcileScheduler;
-
- /**
- * Periodic task for reading the Flow Entries and pushing changes
- * into the switches.
- */
- final Runnable mapReader = new Runnable() {
- public void run() {
- try {
- runImpl();
- } catch (Exception e) {
- log.debug("Exception processing All Flow Entries from the Network MAP: ", e);
- dbHandlerInner.rollback();
- return;
- }
- }
-
- private void runImpl() {
- long startTime = System.nanoTime();
- int counterAllFlowEntries = 0;
- int counterMyNotUpdatedFlowEntries = 0;
-
- if (floodlightProvider == null) {
- log.debug("FloodlightProvider service not found!");
- return;
- }
- Map<Long, IOFSwitch> mySwitches =
- floodlightProvider.getSwitches();
- if (mySwitches.isEmpty()) {
- log.trace("No switches controlled");
- return;
- }
- LinkedList<IFlowEntry> addFlowEntries =
- new LinkedList<IFlowEntry>();
- LinkedList<IFlowEntry> deleteFlowEntries =
- new LinkedList<IFlowEntry>();
-
- //
- // Fetch all Flow Entries which need to be updated and select
- // only my Flow Entries that need to be updated into the
- // switches.
- //
- Iterable<IFlowEntry> allFlowEntries =
- dbHandlerInner.getAllSwitchNotUpdatedFlowEntries();
- for (IFlowEntry flowEntryObj : allFlowEntries) {
- log.debug("flowEntryobj : {}", flowEntryObj);
-
- counterAllFlowEntries++;
-
- String dpidStr = flowEntryObj.getSwitchDpid();
- if (dpidStr == null)
- continue;
- Dpid dpid = new Dpid(dpidStr);
- IOFSwitch mySwitch = mySwitches.get(dpid.value());
- if (mySwitch == null)
- continue; // Ignore the entry: not my switch
-
- IFlowPath flowObj =
- dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null)
- continue; // Should NOT happen
- if (flowObj.getFlowId() == null)
- continue; // Invalid entry
-
- //
- // NOTE: For now we process the DELETE before the ADD
- // to cover the more common scenario.
- // TODO: This is error prone and needs to be fixed!
- //
- String userState = flowEntryObj.getUserState();
- if (userState == null)
- continue;
- if (userState.equals("FE_USER_DELETE")) {
- // An entry that needs to be deleted.
- deleteFlowEntries.add(flowEntryObj);
- installFlowEntry(mySwitch, flowObj, flowEntryObj);
- } else {
- addFlowEntries.add(flowEntryObj);
- }
- counterMyNotUpdatedFlowEntries++;
- }
-
- log.debug("addFlowEntries : {}", addFlowEntries);
-
- //
- // Process the Flow Entries that need to be added
- //
- for (IFlowEntry flowEntryObj : addFlowEntries) {
- IFlowPath flowObj =
- dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null)
- continue; // Should NOT happen
- if (flowObj.getFlowId() == null)
- continue; // Invalid entry
-
- Dpid dpid = new Dpid(flowEntryObj.getSwitchDpid());
- IOFSwitch mySwitch = mySwitches.get(dpid.value());
- if (mySwitch == null)
- continue; // Shouldn't happen
- installFlowEntry(mySwitch, flowObj, flowEntryObj);
- }
-
- //
- // Delete all Flow Entries marked for deletion from the
- // Network MAP.
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and delete the Flow Entries after the
- // Barrier message is received.
- //
- while (! deleteFlowEntries.isEmpty()) {
- IFlowEntry flowEntryObj = deleteFlowEntries.poll();
- IFlowPath flowObj =
- dbHandlerInner.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null) {
- log.debug("Did not find FlowPath to be deleted");
- continue;
- }
- flowObj.removeFlowEntry(flowEntryObj);
- dbHandlerInner.removeFlowEntry(flowEntryObj);
- }
-
- dbHandlerInner.commit();
-
- long estimatedTime = System.nanoTime() - startTime;
- double rate = 0.0;
- if (estimatedTime > 0)
- rate = ((double)counterAllFlowEntries * 1000000000) / estimatedTime;
- String logMsg = "MEASUREMENT: Processed AllFlowEntries: " +
- counterAllFlowEntries + " MyNotUpdatedFlowEntries: " +
- counterMyNotUpdatedFlowEntries + " in " +
- (double)estimatedTime / 1000000000 + " sec: " +
- rate + " paths/s";
- log.debug(logMsg);
- }
- };
-
- /**
- * Periodic task for reading the Flow Paths and recomputing the
- * shortest paths.
- */
- final Runnable shortestPathReconcile = new Runnable() {
- public void run() {
- try {
- runImpl();
- } catch (Exception e) {
- log.debug("Exception processing All Flows from the Network MAP: ", e);
- dbHandlerInner.rollback();
- return;
- }
- }
-
- private void runImpl() {
- long startTime = System.nanoTime();
- int counterAllFlowPaths = 0;
- int counterMyFlowPaths = 0;
-
- if (floodlightProvider == null) {
- log.debug("FloodlightProvider service not found!");
- return;
- }
- Map<Long, IOFSwitch> mySwitches =
- floodlightProvider.getSwitches();
- if (mySwitches.isEmpty()) {
- log.trace("No switches controlled");
- return;
- }
- LinkedList<IFlowPath> deleteFlows = new LinkedList<IFlowPath>();
-
- //
- // Fetch and recompute the Shortest Path for those
- // Flow Paths this controller is responsible for.
- //
- Topology topology = topologyNetService.newDatabaseTopology();
- Iterable<IFlowPath> allFlowPaths = dbHandlerInner.getAllFlowPaths();
- for (IFlowPath flowPathObj : allFlowPaths) {
- counterAllFlowPaths++;
- if (flowPathObj == null)
- continue;
-
- String srcDpidStr = flowPathObj.getSrcSwitch();
- if (srcDpidStr == null)
- continue;
- Dpid srcDpid = new Dpid(srcDpidStr);
- //
- // Use the source DPID as a heuristic to decide
- // which controller is responsible for maintaining the
- // shortest path.
- // NOTE: This heuristic is error-prone: if the switch
- // goes away and no controller is responsible for that
- // switch, then the original Flow Path is not cleaned-up
- //
- IOFSwitch mySwitch = mySwitches.get(srcDpid.value());
- if (mySwitch == null)
- continue; // Ignore: not my responsibility
-
- // Test whether we need to maintain this flow
- String flowPathTypeStr = flowPathObj.getFlowPathType();
- if (flowPathTypeStr == null)
- continue; // Could be invalid entry?
- if (! flowPathTypeStr.equals("FP_TYPE_SHORTEST_PATH"))
- continue; // No need to maintain this flow
-
- //
- // Test whether we need to complete the Flow cleanup,
- // if the Flow has been deleted by the user.
- //
- String flowPathUserStateStr = flowPathObj.getFlowPathUserState();
- if ((flowPathUserStateStr != null)
- && flowPathUserStateStr.equals("FP_USER_DELETE")) {
- Iterable<IFlowEntry> flowEntries = flowPathObj.getFlowEntries();
- final boolean empty = !flowEntries.iterator().hasNext();
- if (empty)
- deleteFlows.add(flowPathObj);
- }
-
- // Fetch the fields needed to recompute the shortest path
- String dataPathSummaryStr = flowPathObj.getDataPathSummary();
- Short srcPortShort = flowPathObj.getSrcPort();
- String dstDpidStr = flowPathObj.getDstSwitch();
- Short dstPortShort = flowPathObj.getDstPort();
- Long flowPathFlagsLong = flowPathObj.getFlowPathFlags();
- if ((dataPathSummaryStr == null) ||
- (srcPortShort == null) ||
- (dstDpidStr == null) ||
- (dstPortShort == null) ||
- (flowPathFlagsLong == null)) {
- continue;
- }
-
- Port srcPort = new Port(srcPortShort);
- Dpid dstDpid = new Dpid(dstDpidStr);
- Port dstPort = new Port(dstPortShort);
- SwitchPort srcSwitchPort = new SwitchPort(srcDpid, srcPort);
- SwitchPort dstSwitchPort = new SwitchPort(dstDpid, dstPort);
- FlowPathType flowPathType = FlowPathType.valueOf(flowPathTypeStr);
- FlowPathUserState flowPathUserState = FlowPathUserState.valueOf(flowPathUserStateStr);
- FlowPathFlags flowPathFlags = new FlowPathFlags(flowPathFlagsLong);
-
- counterMyFlowPaths++;
-
- //
- // NOTE: Using here the regular getDatabaseShortestPath()
- // method won't work here, because that method calls
- // internally "conn.endTx(Transaction.COMMIT)", and that
- // will invalidate all handlers to the Titan database.
- // If we want to experiment with calling here
- // getDatabaseShortestPath(), we need to refactor that code
- // to avoid closing the transaction.
- //
- DataPath dataPath =
- topologyNetService.getTopologyShortestPath(
- topology,
- srcSwitchPort,
- dstSwitchPort);
- if (dataPath == null) {
- // We need the DataPath to compare the paths
- dataPath = new DataPath();
- dataPath.setSrcPort(srcSwitchPort);
- dataPath.setDstPort(dstSwitchPort);
- }
- dataPath.applyFlowPathFlags(flowPathFlags);
-
- String newDataPathSummaryStr = dataPath.dataPathSummary();
- if (dataPathSummaryStr.equals(newDataPathSummaryStr))
- continue; // Nothing changed
-
- reconcileFlow(flowPathObj, dataPath);
- }
-
- //
- // Delete all leftover Flows marked for deletion from the
- // Network MAP.
- //
- while (! deleteFlows.isEmpty()) {
- IFlowPath flowPathObj = deleteFlows.poll();
- dbHandlerInner.removeFlowPath(flowPathObj);
- }
-
- topologyNetService.dropTopology(topology);
-
- dbHandlerInner.commit();
-
- long estimatedTime = System.nanoTime() - startTime;
- double rate = 0.0;
- if (estimatedTime > 0)
- rate = ((double)counterAllFlowPaths * 1000000000) / estimatedTime;
- String logMsg = "MEASUREMENT: Processed AllFlowPaths: " +
- counterAllFlowPaths + " MyFlowPaths: " +
- counterMyFlowPaths + " in " +
- (double)estimatedTime / 1000000000 + " sec: " +
- rate + " paths/s";
- log.debug(logMsg);
- }
- };
-
-
/**
* Initialize the Flow Manager.
*
@@ -469,22 +159,18 @@
throws FloodlightModuleException {
this.context = context;
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
- topologyNetService = context.getServiceImpl(ITopologyNetService.class);
datagridService = context.getServiceImpl(IDatagridService.class);
restApi = context.getServiceImpl(IRestApiService.class);
if (enableFlowPusher) {
- pusher = context.getServiceImpl(IFlowPusherService.class);
+ pusher = context.getServiceImpl(IFlowPusherService.class);
} else {
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
EnumSet.of(OFType.FLOW_MOD),
OFMESSAGE_DAMPER_TIMEOUT);
}
this.init("");
-
- mapReaderScheduler = Executors.newScheduledThreadPool(1);
- shortestPathReconcileScheduler = Executors.newScheduledThreadPool(1);
}
/**
@@ -531,20 +217,14 @@
flowDatabaseWriter.start();
//
- // Create the Flow Event Handler thread and register it with the
- // Datagrid Service
+ // The Flow Event Handler thread:
+ // - create
+ // - register with the Datagrid Service
+ // - startup
//
flowEventHandler = new FlowEventHandler(this, datagridService);
datagridService.registerFlowEventHandlerService(flowEventHandler);
-
- // Schedule the threads and periodic tasks
flowEventHandler.start();
- if (! enableNotifications) {
- mapReaderScheduler.scheduleAtFixedRate(
- mapReader, 3, 3, TimeUnit.SECONDS);
- shortestPathReconcileScheduler.scheduleAtFixedRate(
- shortestPathReconcile, 3, 3, TimeUnit.SECONDS);
- }
}
/**
@@ -786,87 +466,6 @@
}
/**
- * Reconcile a flow.
- *
- * @param flowObj the flow that needs to be reconciled.
- * @param newDataPath the new data path to use.
- * @return true on success, otherwise false.
- */
- private boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
- String flowIdStr = flowObj.getFlowId();
-
- //
- // Set the incoming port matching and the outgoing port output
- // actions for each flow entry.
- //
- int idx = 0;
- for (FlowEntry flowEntry : newDataPath.flowEntries()) {
- flowEntry.setFlowId(new FlowId(flowIdStr));
-
- // Mark the Flow Entry as not updated in the switch
- flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
- // Set the incoming port matching
- FlowEntryMatch flowEntryMatch = new FlowEntryMatch();
- flowEntry.setFlowEntryMatch(flowEntryMatch);
- flowEntryMatch.enableInPort(flowEntry.inPort());
-
- //
- // Set the actions
- //
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- // If the first Flow Entry, copy the Flow Path actions to it
- //
- if (idx == 0) {
- String actionsStr = flowObj.getActions();
- if (actionsStr != null) {
- FlowEntryActions flowActions = new FlowEntryActions(actionsStr);
- for (FlowEntryAction action : flowActions.actions())
- flowEntryActions.addAction(action);
- }
- }
- idx++;
- //
- // Add the outgoing port output action
- //
- FlowEntryAction flowEntryAction = new FlowEntryAction();
- flowEntryAction.setActionOutput(flowEntry.outPort());
- flowEntryActions.addAction(flowEntryAction);
- }
-
- //
- // Remove the old Flow Entries, and add the new Flow Entries
- //
- Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
- for (IFlowEntry flowEntryObj : flowEntries) {
- flowEntryObj.setUserState("FE_USER_DELETE");
- flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- }
- for (FlowEntry flowEntry : newDataPath.flowEntries()) {
- addFlowEntry(flowObj, flowEntry);
- }
-
- //
- // Set the Data Path Summary
- //
- String dataPathSummaryStr = newDataPath.dataPathSummary();
- flowObj.setDataPathSummary(dataPathSummaryStr);
-
- return true;
- }
-
- /**
- * Reconcile all flows in a set.
- *
- * @param flowObjSet the set of flows that need to be reconciliated.
- */
- private void reconcileFlows(Iterable<IFlowPath> flowObjSet) {
- if (! flowObjSet.iterator().hasNext())
- return;
- // TODO: Not implemented/used yet.
- }
-
- /**
* Install a Flow Entry on a switch.
*
* @param mySwitch the switch to install the Flow Entry into.
@@ -877,11 +476,11 @@
private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
IFlowEntry flowEntryObj) {
if (enableFlowPusher) {
- return pusher.add(mySwitch, flowObj, flowEntryObj);
+ return pusher.add(mySwitch, flowObj, flowEntryObj);
} else {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowObj, flowEntryObj);
+ return FlowSwitchOperation.installFlowEntry(
+ floodlightProvider.getOFMessageFactory(),
+ messageDamper, mySwitch, flowObj, flowEntryObj);
}
}
@@ -896,11 +495,11 @@
private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
FlowEntry flowEntry) {
if (enableFlowPusher) {
- return pusher.add(mySwitch, flowPath, flowEntry);
+ return pusher.add(mySwitch, flowPath, flowEntry);
} else {
- return FlowSwitchOperation.installFlowEntry(
- floodlightProvider.getOFMessageFactory(),
- messageDamper, mySwitch, flowPath, flowEntry);
+ return FlowSwitchOperation.installFlowEntry(
+ floodlightProvider.getOFMessageFactory(),
+ messageDamper, mySwitch, flowPath, flowEntry);
}
}
@@ -931,10 +530,6 @@
*/
public void pushModifiedFlowEntriesToSwitches(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
- return;
-
if (modifiedFlowEntries.isEmpty())
return;
@@ -977,10 +572,6 @@
*/
public void pushModifiedFlowEntriesToDatagrid(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
- return;
-
if (modifiedFlowEntries.isEmpty())
return;
@@ -1087,10 +678,6 @@
*/
void pushModifiedFlowEntriesToDatabase(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
- return;
-
//
// We only add the Flow Entries to the Database Queue.
// The FlowDatabaseWriter thread is responsible for the actual writing.
@@ -1109,10 +696,6 @@
*/
private void writeModifiedFlowEntriesToDatabase(
Collection<FlowPathEntryPair> modifiedFlowEntries) {
- // TODO: For now, the pushing of Flow Entries is disabled
- if (! enableNotifications)
- return;
-
if (modifiedFlowEntries.isEmpty())
return;