Merge branch 'master' into RAMCloud
Conflicts:
conf/onos.properties
src/main/java/net/onrc/onos/flow/FlowManagerImpl.java
src/main/java/net/onrc/onos/graph/LocalTopologyEventListener.java
src/main/java/net/onrc/onos/ofcontroller/bgproute/BgpRoute.java
src/main/java/net/onrc/onos/ofcontroller/core/ILinkStorage.java
src/main/java/net/onrc/onos/ofcontroller/core/ISwitchStorage.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/DeviceStorageImpl.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/LinkStorageImpl.java
src/main/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImpl.java
src/main/java/net/onrc/onos/ofcontroller/floodlightlistener/NetworkGraphPublisher.java
src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
src/main/java/net/onrc/onos/ofcontroller/topology/ShortestPath.java
src/main/java/net/onrc/onos/ofcontroller/topology/web/RouteResource.java
src/test/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImplTest.java
src/test/java/net/onrc/onos/ofcontroller/core/internal/SwitchStorageImplTestBB.java
src/test/java/net/onrc/onos/ofcontroller/devicemanager/internal/DeviceStorageImplTest.java
src/test/java/net/onrc/onos/ofcontroller/topology/TopologyManagerTest.java
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
new file mode 100644
index 0000000..73c4e51
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowDatabaseOperation.java
@@ -0,0 +1,850 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import net.floodlightcontroller.util.MACAddress;
+
+import net.onrc.onos.graph.GraphDBOperation;
+
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
+import net.onrc.onos.ofcontroller.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for performing Flow-related operations on the Database.
+ */
+class FlowDatabaseOperation {
+ private final static Logger log = LoggerFactory.getLogger(FlowDatabaseOperation.class);
+
+ /**
+ * Add a flow.
+ *
+ * @param flowManager the Flow Manager to use.
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowPath the Flow Path to install.
+ * @param flowId the return-by-reference Flow ID as assigned internally.
+ * @return true on success, otherwise false.
+ */
+ static boolean addFlow(FlowManager flowManager,
+ GraphDBOperation dbHandler,
+ FlowPath flowPath, FlowId flowId) {
+ IFlowPath flowObj = null;
+ boolean found = false;
+ try {
+ if ((flowObj = dbHandler.searchFlowPath(flowPath.flowId())) != null) {
+ found = true;
+ } else {
+ flowObj = dbHandler.newFlowPath();
+ }
+ } catch (Exception e) {
+ dbHandler.rollback();
+
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ String stacktrace = sw.toString();
+
+ log.error(":addFlow FlowId:{} failed: {}",
+ flowPath.flowId().toString(),
+ stacktrace);
+ return false;
+ }
+ if (flowObj == null) {
+ log.error(":addFlow FlowId:{} failed: Flow object not created",
+ flowPath.flowId().toString());
+ dbHandler.rollback();
+ return false;
+ }
+
+ //
+ // Set the Flow key:
+ // - flowId
+ //
+ flowObj.setFlowId(flowPath.flowId().toString());
+ flowObj.setType("flow");
+
+ //
+ // Set the Flow attributes:
+ // - flowPath.installerId()
+ // - flowPath.flowPathType()
+ // - flowPath.flowPathUserState()
+ // - flowPath.flowPathFlags()
+ // - flowPath.dataPath().srcPort()
+ // - flowPath.dataPath().dstPort()
+ // - flowPath.matchSrcMac()
+ // - flowPath.matchDstMac()
+ // - flowPath.matchEthernetFrameType()
+ // - flowPath.matchVlanId()
+ // - flowPath.matchVlanPriority()
+ // - flowPath.matchSrcIPv4Net()
+ // - flowPath.matchDstIPv4Net()
+ // - flowPath.matchIpProto()
+ // - flowPath.matchIpToS()
+ // - flowPath.matchSrcTcpUdpPort()
+ // - flowPath.matchDstTcpUdpPort()
+ // - flowPath.flowEntryActions()
+ //
+ flowObj.setInstallerId(flowPath.installerId().toString());
+ flowObj.setFlowPathType(flowPath.flowPathType().toString());
+ flowObj.setFlowPathUserState(flowPath.flowPathUserState().toString());
+ flowObj.setFlowPathFlags(flowPath.flowPathFlags().flags());
+ flowObj.setSrcSwitch(flowPath.dataPath().srcPort().dpid().toString());
+ flowObj.setSrcPort(flowPath.dataPath().srcPort().port().value());
+ flowObj.setDstSwitch(flowPath.dataPath().dstPort().dpid().toString());
+ flowObj.setDstPort(flowPath.dataPath().dstPort().port().value());
+ if (flowPath.flowEntryMatch().matchSrcMac()) {
+ flowObj.setMatchSrcMac(flowPath.flowEntryMatch().srcMac().toString());
+ }
+ if (flowPath.flowEntryMatch().matchDstMac()) {
+ flowObj.setMatchDstMac(flowPath.flowEntryMatch().dstMac().toString());
+ }
+ if (flowPath.flowEntryMatch().matchEthernetFrameType()) {
+ flowObj.setMatchEthernetFrameType(flowPath.flowEntryMatch().ethernetFrameType());
+ }
+ if (flowPath.flowEntryMatch().matchVlanId()) {
+ flowObj.setMatchVlanId(flowPath.flowEntryMatch().vlanId());
+ }
+ if (flowPath.flowEntryMatch().matchVlanPriority()) {
+ flowObj.setMatchVlanPriority(flowPath.flowEntryMatch().vlanPriority());
+ }
+ if (flowPath.flowEntryMatch().matchSrcIPv4Net()) {
+ flowObj.setMatchSrcIPv4Net(flowPath.flowEntryMatch().srcIPv4Net().toString());
+ }
+ if (flowPath.flowEntryMatch().matchDstIPv4Net()) {
+ flowObj.setMatchDstIPv4Net(flowPath.flowEntryMatch().dstIPv4Net().toString());
+ }
+ if (flowPath.flowEntryMatch().matchIpProto()) {
+ flowObj.setMatchIpProto(flowPath.flowEntryMatch().ipProto());
+ }
+ if (flowPath.flowEntryMatch().matchIpToS()) {
+ flowObj.setMatchIpToS(flowPath.flowEntryMatch().ipToS());
+ }
+ if (flowPath.flowEntryMatch().matchSrcTcpUdpPort()) {
+ flowObj.setMatchSrcTcpUdpPort(flowPath.flowEntryMatch().srcTcpUdpPort());
+ }
+ if (flowPath.flowEntryMatch().matchDstTcpUdpPort()) {
+ flowObj.setMatchDstTcpUdpPort(flowPath.flowEntryMatch().dstTcpUdpPort());
+ }
+ if (! flowPath.flowEntryActions().actions().isEmpty()) {
+ flowObj.setActions(flowPath.flowEntryActions().toString());
+ }
+ flowObj.setDataPathSummary(flowPath.dataPath().dataPathSummary());
+
+ if (found)
+ flowObj.setFlowPathUserState("FP_USER_MODIFY");
+ else
+ flowObj.setFlowPathUserState("FP_USER_ADD");
+
+ // Flow edges:
+ // HeadFE
+
+
+ //
+ // Flow Entries:
+ // flowPath.dataPath().flowEntries()
+ //
+ for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
+ if (addFlowEntry(flowManager, dbHandler, flowObj, flowEntry) == null) {
+ dbHandler.rollback();
+ return false;
+ }
+ }
+ dbHandler.commit();
+
+ //
+ // TODO: We need a proper Flow ID allocation mechanism.
+ //
+ flowId.setValue(flowPath.flowId().value());
+
+ return true;
+ }
+
+ /**
+ * Add a flow entry to the Network MAP.
+ *
+ * @param flowManager the Flow Manager to use.
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowObj the corresponding Flow Path object for the Flow Entry.
+ * @param flowEntry the Flow Entry to install.
+ * @return the added Flow Entry object on success, otherwise null.
+ */
+ static IFlowEntry addFlowEntry(FlowManager flowManager,
+ GraphDBOperation dbHandler,
+ IFlowPath flowObj,
+ FlowEntry flowEntry) {
+ // Flow edges
+ // HeadFE (TODO)
+
+ //
+ // Assign the FlowEntry ID.
+ //
+ if ((flowEntry.flowEntryId() == null) ||
+ (flowEntry.flowEntryId().value() == 0)) {
+ long id = flowManager.getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+
+ IFlowEntry flowEntryObj = null;
+ boolean found = false;
+ try {
+ if ((flowEntryObj =
+ dbHandler.searchFlowEntry(flowEntry.flowEntryId())) != null) {
+ found = true;
+ } else {
+ flowEntryObj = dbHandler.newFlowEntry();
+ }
+ } catch (Exception e) {
+ log.error(":addFlow FlowEntryId:{} failed",
+ flowEntry.flowEntryId().toString());
+ return null;
+ }
+ if (flowEntryObj == null) {
+ log.error(":addFlow FlowEntryId:{} failed: FlowEntry object not created",
+ flowEntry.flowEntryId().toString());
+ return null;
+ }
+
+ //
+ // Set the Flow Entry key:
+ // - flowEntry.flowEntryId()
+ //
+ flowEntryObj.setFlowEntryId(flowEntry.flowEntryId().toString());
+ flowEntryObj.setType("flow_entry");
+
+ //
+ // Set the Flow Entry Edges and attributes:
+ // - Switch edge
+ // - InPort edge
+ // - OutPort edge
+ //
+ // - flowEntry.dpid()
+ // - flowEntry.flowEntryUserState()
+ // - flowEntry.flowEntrySwitchState()
+ // - flowEntry.flowEntryErrorState()
+ // - flowEntry.matchInPort()
+ // - flowEntry.matchSrcMac()
+ // - flowEntry.matchDstMac()
+ // - flowEntry.matchEthernetFrameType()
+ // - flowEntry.matchVlanId()
+ // - flowEntry.matchVlanPriority()
+ // - flowEntry.matchSrcIPv4Net()
+ // - flowEntry.matchDstIPv4Net()
+ // - flowEntry.matchIpProto()
+ // - flowEntry.matchIpToS()
+ // - flowEntry.matchSrcTcpUdpPort()
+ // - flowEntry.matchDstTcpUdpPort()
+ // - flowEntry.actionOutputPort()
+ // - flowEntry.actions()
+ //
+ ISwitchObject sw = dbHandler.searchSwitch(flowEntry.dpid().toString());
+ flowEntryObj.setSwitchDpid(flowEntry.dpid().toString());
+ flowEntryObj.setSwitch(sw);
+ if (flowEntry.flowEntryMatch().matchInPort()) {
+ IPortObject inport =
+ dbHandler.searchPort(flowEntry.dpid().toString(),
+ flowEntry.flowEntryMatch().inPort().value());
+ flowEntryObj.setMatchInPort(flowEntry.flowEntryMatch().inPort().value());
+ flowEntryObj.setInPort(inport);
+ }
+ if (flowEntry.flowEntryMatch().matchSrcMac()) {
+ flowEntryObj.setMatchSrcMac(flowEntry.flowEntryMatch().srcMac().toString());
+ }
+ if (flowEntry.flowEntryMatch().matchDstMac()) {
+ flowEntryObj.setMatchDstMac(flowEntry.flowEntryMatch().dstMac().toString());
+ }
+ if (flowEntry.flowEntryMatch().matchEthernetFrameType()) {
+ flowEntryObj.setMatchEthernetFrameType(flowEntry.flowEntryMatch().ethernetFrameType());
+ }
+ if (flowEntry.flowEntryMatch().matchVlanId()) {
+ flowEntryObj.setMatchVlanId(flowEntry.flowEntryMatch().vlanId());
+ }
+ if (flowEntry.flowEntryMatch().matchVlanPriority()) {
+ flowEntryObj.setMatchVlanPriority(flowEntry.flowEntryMatch().vlanPriority());
+ }
+ if (flowEntry.flowEntryMatch().matchSrcIPv4Net()) {
+ flowEntryObj.setMatchSrcIPv4Net(flowEntry.flowEntryMatch().srcIPv4Net().toString());
+ }
+ if (flowEntry.flowEntryMatch().matchDstIPv4Net()) {
+ flowEntryObj.setMatchDstIPv4Net(flowEntry.flowEntryMatch().dstIPv4Net().toString());
+ }
+ if (flowEntry.flowEntryMatch().matchIpProto()) {
+ flowEntryObj.setMatchIpProto(flowEntry.flowEntryMatch().ipProto());
+ }
+ if (flowEntry.flowEntryMatch().matchIpToS()) {
+ flowEntryObj.setMatchIpToS(flowEntry.flowEntryMatch().ipToS());
+ }
+ if (flowEntry.flowEntryMatch().matchSrcTcpUdpPort()) {
+ flowEntryObj.setMatchSrcTcpUdpPort(flowEntry.flowEntryMatch().srcTcpUdpPort());
+ }
+ if (flowEntry.flowEntryMatch().matchDstTcpUdpPort()) {
+ flowEntryObj.setMatchDstTcpUdpPort(flowEntry.flowEntryMatch().dstTcpUdpPort());
+ }
+
+ for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
+ if (fa.actionOutput() != null) {
+ IPortObject outport =
+ dbHandler.searchPort(flowEntry.dpid().toString(),
+ fa.actionOutput().port().value());
+ flowEntryObj.setActionOutputPort(fa.actionOutput().port().value());
+ flowEntryObj.setOutPort(outport);
+ }
+ }
+ if (! flowEntry.flowEntryActions().isEmpty()) {
+ flowEntryObj.setActions(flowEntry.flowEntryActions().toString());
+ }
+
+ // TODO: Hacks with hard-coded state names!
+ if (found)
+ flowEntryObj.setUserState("FE_USER_MODIFY");
+ else
+ flowEntryObj.setUserState("FE_USER_ADD");
+ flowEntryObj.setSwitchState(flowEntry.flowEntrySwitchState().toString());
+ //
+ // TODO: Take care of the FlowEntryErrorState.
+ //
+
+ // Flow Entries edges:
+ // Flow
+ // NextFE (TODO)
+ if (! found) {
+ flowObj.addFlowEntry(flowEntryObj);
+ flowEntryObj.setFlow(flowObj);
+ }
+
+ return flowEntryObj;
+ }
+
+ /**
+ * Delete a flow entry from the Network MAP.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowObj the corresponding Flow Path object for the Flow Entry.
+ * @param flowEntry the Flow Entry to delete.
+ * @return true on success, otherwise false.
+ */
+ static boolean deleteFlowEntry(GraphDBOperation dbHandler,
+ IFlowPath flowObj,
+ FlowEntry flowEntry) {
+ IFlowEntry flowEntryObj = null;
+ try {
+ flowEntryObj = dbHandler.searchFlowEntry(flowEntry.flowEntryId());
+ } catch (Exception e) {
+ log.error(":deleteFlowEntry FlowEntryId:{} failed",
+ flowEntry.flowEntryId().toString());
+ return false;
+ }
+ //
+ // TODO: Don't print an error for now, because multiple controller
+ // instances might be deleting the same flow entry.
+ //
+ /*
+ if (flowEntryObj == null) {
+ log.error(":deleteFlowEntry FlowEntryId:{} failed: FlowEntry object not found",
+ flowEntry.flowEntryId().toString());
+ return false;
+ }
+ */
+ if (flowEntryObj == null)
+ return true;
+
+ flowObj.removeFlowEntry(flowEntryObj);
+ dbHandler.removeFlowEntry(flowEntryObj);
+ return true;
+ }
+
+ /**
+ * Delete all previously added flows.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @return true on success, otherwise false.
+ */
+ static boolean deleteAllFlows(GraphDBOperation dbHandler) {
+ List<FlowId> allFlowIds = new LinkedList<FlowId>();
+
+ // Get all Flow IDs
+ Iterable<IFlowPath> allFlowPaths = dbHandler.getAllFlowPaths();
+ for (IFlowPath flowPathObj : allFlowPaths) {
+ if (flowPathObj == null)
+ continue;
+ String flowIdStr = flowPathObj.getFlowId();
+ if (flowIdStr == null)
+ continue;
+ FlowId flowId = new FlowId(flowIdStr);
+ allFlowIds.add(flowId);
+ }
+
+ // Delete all flows one-by-one
+ for (FlowId flowId : allFlowIds) {
+ deleteFlow(dbHandler, flowId);
+ }
+
+ return true;
+ }
+
+ /**
+ * Delete a previously added flow.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the flow to delete.
+ * @return true on success, otherwise false.
+ */
+ static boolean deleteFlow(GraphDBOperation dbHandler, FlowId flowId) {
+ IFlowPath flowObj = null;
+ try {
+ flowObj = dbHandler.searchFlowPath(flowId);
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":deleteFlow FlowId:{} failed", flowId.toString());
+ return false;
+ }
+ if (flowObj == null) {
+ dbHandler.commit();
+ return true; // OK: No such flow
+ }
+
+ //
+ // Remove all Flow Entries
+ //
+ Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
+ for (IFlowEntry flowEntryObj : flowEntries) {
+ flowObj.removeFlowEntry(flowEntryObj);
+ dbHandler.removeFlowEntry(flowEntryObj);
+ }
+ // Remove the Flow itself
+ dbHandler.removeFlowPath(flowObj);
+ dbHandler.commit();
+
+ return true;
+ }
+
+ /**
+ * Get a previously added flow.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the flow to get.
+ * @return the Flow Path if found, otherwise null.
+ */
+ static FlowPath getFlow(GraphDBOperation dbHandler, FlowId flowId) {
+ IFlowPath flowObj = null;
+ try {
+ flowObj = dbHandler.searchFlowPath(flowId);
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getFlow FlowId:{} failed", flowId.toString());
+ return null;
+ }
+ if (flowObj == null) {
+ dbHandler.commit();
+ return null; // Flow not found
+ }
+
+ //
+ // Extract the Flow state
+ //
+ FlowPath flowPath = extractFlowPath(flowObj);
+ dbHandler.commit();
+
+ return flowPath;
+ }
+
+ /**
+ * Get all installed flows by all installers.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ static ArrayList<FlowPath> getAllFlows(GraphDBOperation dbHandler) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllFlowPaths failed");
+ return flowPaths;
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ dbHandler.commit();
+ return flowPaths; // No Flows found
+ }
+
+ for (IFlowPath flowObj : flowPathsObj) {
+ //
+ // Extract the Flow state
+ //
+ FlowPath flowPath = extractFlowPath(flowObj);
+ if (flowPath != null)
+ flowPaths.add(flowPath);
+ }
+
+ dbHandler.commit();
+
+ return flowPaths;
+ }
+
+ /**
+ * Get all previously added flows by a specific installer for a given
+ * data path endpoints.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param installerId the Caller ID of the installer of the flow to get.
+ * @param dataPathEndpoints the data path endpoints of the flow to get.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ static ArrayList<FlowPath> getAllFlows(GraphDBOperation dbHandler,
+ CallerId installerId,
+ DataPathEndpoints dataPathEndpoints) {
+ //
+ // TODO: The implementation below is not optimal:
+ // We fetch all flows, and then return only the subset that match
+ // the query conditions.
+ // We should use the appropriate Titan/Gremlin query to filter-out
+ // the flows as appropriate.
+ //
+ ArrayList<FlowPath> allFlows = getAllFlows(dbHandler);
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+
+ if (allFlows == null)
+ return flowPaths;
+
+ for (FlowPath flow : allFlows) {
+ //
+ // TODO: String-based comparison is sub-optimal.
+ // We are using it for now to save us the extra work of
+ // implementing the "equals()" and "hashCode()" methods.
+ //
+ if (! flow.installerId().toString().equals(installerId.toString()))
+ continue;
+ if (! flow.dataPath().srcPort().toString().equals(dataPathEndpoints.srcPort().toString())) {
+ continue;
+ }
+ if (! flow.dataPath().dstPort().toString().equals(dataPathEndpoints.dstPort().toString())) {
+ continue;
+ }
+ flowPaths.add(flow);
+ }
+
+ return flowPaths;
+ }
+
+ /**
+ * Get all installed flows by all installers for given data path endpoints.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param dataPathEndpoints the data path endpoints of the flows to get.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ static ArrayList<FlowPath> getAllFlows(GraphDBOperation dbHandler,
+ DataPathEndpoints dataPathEndpoints) {
+ //
+ // TODO: The implementation below is not optimal:
+ // We fetch all flows, and then return only the subset that match
+ // the query conditions.
+ // We should use the appropriate Titan/Gremlin query to filter-out
+ // the flows as appropriate.
+ //
+ ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
+ ArrayList<FlowPath> allFlows = getAllFlows(dbHandler);
+
+ if (allFlows == null)
+ return flowPaths;
+
+ for (FlowPath flow : allFlows) {
+ //
+ // TODO: String-based comparison is sub-optimal.
+ // We are using it for now to save us the extra work of
+ // implementing the "equals()" and "hashCode()" methods.
+ //
+ if (! flow.dataPath().srcPort().toString().equals(dataPathEndpoints.srcPort().toString())) {
+ continue;
+ }
+ if (! flow.dataPath().dstPort().toString().equals(dataPathEndpoints.dstPort().toString())) {
+ continue;
+ }
+ flowPaths.add(flow);
+ }
+
+ return flowPaths;
+ }
+
+ /**
+ * Get summary of all installed flows by all installers in a given range.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @param flowId the Flow ID of the first flow in the flow range to get.
+ * @param maxFlows the maximum number of flows to be returned.
+ * @return the Flow Paths if found, otherwise null.
+ */
+ static ArrayList<IFlowPath> getAllFlowsSummary(GraphDBOperation dbHandler,
+ FlowId flowId,
+ int maxFlows) {
+ //
+ // TODO: The implementation below is not optimal:
+ // We fetch all flows, and then return only the subset that match
+ // the query conditions.
+ // We should use the appropriate Titan/Gremlin query to filter-out
+ // the flows as appropriate.
+ //
+ ArrayList<IFlowPath> flowPathsWithoutFlowEntries =
+ getAllFlowsWithoutFlowEntries(dbHandler);
+
+ Collections.sort(flowPathsWithoutFlowEntries,
+ new Comparator<IFlowPath>() {
+ @Override
+ public int compare(IFlowPath first, IFlowPath second) {
+ long result =
+ new FlowId(first.getFlowId()).value()
+ - new FlowId(second.getFlowId()).value();
+ if (result > 0) {
+ return 1;
+ } else if (result < 0) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+ }
+ );
+
+ return flowPathsWithoutFlowEntries;
+ }
+
+ /**
+ * Get all Flows information, without the associated Flow Entries.
+ *
+ * @param dbHandler the Graph Database handler to use.
+ * @return all Flows information, without the associated Flow Entries.
+ */
+ static ArrayList<IFlowPath> getAllFlowsWithoutFlowEntries(GraphDBOperation dbHandler) {
+ Iterable<IFlowPath> flowPathsObj = null;
+ ArrayList<IFlowPath> flowPathsObjArray = new ArrayList<IFlowPath>();
+
+ // TODO: Remove this op.commit() flow, because it is not needed?
+ dbHandler.commit();
+
+ try {
+ flowPathsObj = dbHandler.getAllFlowPaths();
+ } catch (Exception e) {
+ // TODO: handle exceptions
+ dbHandler.rollback();
+ log.error(":getAllFlowPaths failed");
+ return flowPathsObjArray; // No Flows found
+ }
+ if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
+ return flowPathsObjArray; // No Flows found
+ }
+
+ for (IFlowPath flowObj : flowPathsObj)
+ flowPathsObjArray.add(flowObj);
+
+ // conn.endTx(Transaction.COMMIT);
+
+ return flowPathsObjArray;
+ }
+
+ /**
+ * Extract Flow Path State from a Titan Database Object @ref IFlowPath.
+ *
+ * @param flowObj the object to extract the Flow Path State from.
+ * @return the extracted Flow Path State.
+ */
+ private static FlowPath extractFlowPath(IFlowPath flowObj) {
+ //
+ // Extract the Flow state
+ //
+ String flowIdStr = flowObj.getFlowId();
+ String installerIdStr = flowObj.getInstallerId();
+ String flowPathType = flowObj.getFlowPathType();
+ String flowPathUserState = flowObj.getFlowPathUserState();
+ Long flowPathFlags = flowObj.getFlowPathFlags();
+ String srcSwitchStr = flowObj.getSrcSwitch();
+ Short srcPortShort = flowObj.getSrcPort();
+ String dstSwitchStr = flowObj.getDstSwitch();
+ Short dstPortShort = flowObj.getDstPort();
+
+ if ((flowIdStr == null) ||
+ (installerIdStr == null) ||
+ (flowPathType == null) ||
+ (flowPathUserState == null) ||
+ (flowPathFlags == null) ||
+ (srcSwitchStr == null) ||
+ (srcPortShort == null) ||
+ (dstSwitchStr == null) ||
+ (dstPortShort == null)) {
+ // TODO: A work-around, becauuse of some bogus database objects
+ return null;
+ }
+
+ FlowPath flowPath = new FlowPath();
+ flowPath.setFlowId(new FlowId(flowIdStr));
+ flowPath.setInstallerId(new CallerId(installerIdStr));
+ flowPath.setFlowPathType(FlowPathType.valueOf(flowPathType));
+ flowPath.setFlowPathUserState(FlowPathUserState.valueOf(flowPathUserState));
+ flowPath.setFlowPathFlags(new FlowPathFlags(flowPathFlags));
+ flowPath.dataPath().srcPort().setDpid(new Dpid(srcSwitchStr));
+ flowPath.dataPath().srcPort().setPort(new Port(srcPortShort));
+ flowPath.dataPath().dstPort().setDpid(new Dpid(dstSwitchStr));
+ flowPath.dataPath().dstPort().setPort(new Port(dstPortShort));
+ //
+ // Extract the match conditions common for all Flow Entries
+ //
+ {
+ FlowEntryMatch match = new FlowEntryMatch();
+ String matchSrcMac = flowObj.getMatchSrcMac();
+ if (matchSrcMac != null)
+ match.enableSrcMac(MACAddress.valueOf(matchSrcMac));
+ String matchDstMac = flowObj.getMatchDstMac();
+ if (matchDstMac != null)
+ match.enableDstMac(MACAddress.valueOf(matchDstMac));
+ Short matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
+ if (matchEthernetFrameType != null)
+ match.enableEthernetFrameType(matchEthernetFrameType);
+ Short matchVlanId = flowObj.getMatchVlanId();
+ if (matchVlanId != null)
+ match.enableVlanId(matchVlanId);
+ Byte matchVlanPriority = flowObj.getMatchVlanPriority();
+ if (matchVlanPriority != null)
+ match.enableVlanPriority(matchVlanPriority);
+ String matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
+ if (matchSrcIPv4Net != null)
+ match.enableSrcIPv4Net(new IPv4Net(matchSrcIPv4Net));
+ String matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
+ if (matchDstIPv4Net != null)
+ match.enableDstIPv4Net(new IPv4Net(matchDstIPv4Net));
+ Byte matchIpProto = flowObj.getMatchIpProto();
+ if (matchIpProto != null)
+ match.enableIpProto(matchIpProto);
+ Byte matchIpToS = flowObj.getMatchIpToS();
+ if (matchIpToS != null)
+ match.enableIpToS(matchIpToS);
+ Short matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
+ if (matchSrcTcpUdpPort != null)
+ match.enableSrcTcpUdpPort(matchSrcTcpUdpPort);
+ Short matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
+ if (matchDstTcpUdpPort != null)
+ match.enableDstTcpUdpPort(matchDstTcpUdpPort);
+
+ flowPath.setFlowEntryMatch(match);
+ }
+ //
+ // Extract the actions for the first Flow Entry
+ //
+ {
+ String actionsStr = flowObj.getActions();
+ if (actionsStr != null) {
+ FlowEntryActions flowEntryActions = new FlowEntryActions(actionsStr);
+ flowPath.setFlowEntryActions(flowEntryActions);
+ }
+ }
+
+ //
+ // Extract all Flow Entries
+ //
+ Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
+ for (IFlowEntry flowEntryObj : flowEntries) {
+ FlowEntry flowEntry = extractFlowEntry(flowEntryObj);
+ if (flowEntry == null)
+ continue;
+ flowPath.dataPath().flowEntries().add(flowEntry);
+ }
+
+ return flowPath;
+ }
+
+ /**
+ * Extract Flow Entry State from a Titan Database Object @ref IFlowEntry.
+ *
+ * @param flowEntryObj the object to extract the Flow Entry State from.
+ * @return the extracted Flow Entry State.
+ */
+ public static FlowEntry extractFlowEntry(IFlowEntry flowEntryObj) {
+ String flowEntryIdStr = flowEntryObj.getFlowEntryId();
+ String switchDpidStr = flowEntryObj.getSwitchDpid();
+ String userState = flowEntryObj.getUserState();
+ String switchState = flowEntryObj.getSwitchState();
+
+ if ((flowEntryIdStr == null) ||
+ (switchDpidStr == null) ||
+ (userState == null) ||
+ (switchState == null)) {
+ // TODO: A work-around, because of some bogus database objects
+ return null;
+ }
+
+ FlowEntry flowEntry = new FlowEntry();
+ flowEntry.setFlowEntryId(new FlowEntryId(flowEntryIdStr));
+ flowEntry.setDpid(new Dpid(switchDpidStr));
+
+ //
+ // Extract the match conditions
+ //
+ FlowEntryMatch match = new FlowEntryMatch();
+ Short matchInPort = flowEntryObj.getMatchInPort();
+ if (matchInPort != null)
+ match.enableInPort(new Port(matchInPort));
+ String matchSrcMac = flowEntryObj.getMatchSrcMac();
+ if (matchSrcMac != null)
+ match.enableSrcMac(MACAddress.valueOf(matchSrcMac));
+ String matchDstMac = flowEntryObj.getMatchDstMac();
+ if (matchDstMac != null)
+ match.enableDstMac(MACAddress.valueOf(matchDstMac));
+ Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
+ if (matchEthernetFrameType != null)
+ match.enableEthernetFrameType(matchEthernetFrameType);
+ Short matchVlanId = flowEntryObj.getMatchVlanId();
+ if (matchVlanId != null)
+ match.enableVlanId(matchVlanId);
+ Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
+ if (matchVlanPriority != null)
+ match.enableVlanPriority(matchVlanPriority);
+ String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
+ if (matchSrcIPv4Net != null)
+ match.enableSrcIPv4Net(new IPv4Net(matchSrcIPv4Net));
+ String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
+ if (matchDstIPv4Net != null)
+ match.enableDstIPv4Net(new IPv4Net(matchDstIPv4Net));
+ Byte matchIpProto = flowEntryObj.getMatchIpProto();
+ if (matchIpProto != null)
+ match.enableIpProto(matchIpProto);
+ Byte matchIpToS = flowEntryObj.getMatchIpToS();
+ if (matchIpToS != null)
+ match.enableIpToS(matchIpToS);
+ Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
+ if (matchSrcTcpUdpPort != null)
+ match.enableSrcTcpUdpPort(matchSrcTcpUdpPort);
+ Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
+ if (matchDstTcpUdpPort != null)
+ match.enableDstTcpUdpPort(matchDstTcpUdpPort);
+ flowEntry.setFlowEntryMatch(match);
+
+ //
+ // Extract the actions
+ //
+ FlowEntryActions actions = new FlowEntryActions();
+ String actionsStr = flowEntryObj.getActions();
+ if (actionsStr != null)
+ actions = new FlowEntryActions(actionsStr);
+ flowEntry.setFlowEntryActions(actions);
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.valueOf(userState));
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.valueOf(switchState));
+ //
+ // TODO: Take care of FlowEntryErrorState.
+ //
+ return flowEntry;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
new file mode 100644
index 0000000..0e9887a
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowEventHandler.java
@@ -0,0 +1,867 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.ofcontroller.topology.Topology;
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.topology.TopologyManager;
+import net.onrc.onos.ofcontroller.util.DataPath;
+import net.onrc.onos.ofcontroller.util.EventEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction;
+import net.onrc.onos.ofcontroller.util.FlowEntryActions;
+import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
+import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
+import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
+import net.onrc.onos.ofcontroller.util.FlowId;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.FlowPathUserState;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A class for storing a pair of Flow Path and a Flow Entry.
+ */
+class FlowPathEntryPair {
+ protected FlowPath flowPath;
+ protected FlowEntry flowEntry;
+
+ protected FlowPathEntryPair(FlowPath flowPath, FlowEntry flowEntry) {
+ this.flowPath = flowPath;
+ this.flowEntry = flowEntry;
+ }
+}
+
+/**
+ * Class for FlowPath Maintenance.
+ * This class listens for FlowEvents to:
+ * - Maintain a local cache of the Network Topology.
+ * - Detect FlowPaths impacted by Topology change.
+ * - Recompute impacted FlowPath using cached Topology.
+ */
+class FlowEventHandler extends Thread implements IFlowEventHandlerService {
+ /** The logger. */
+ private final static Logger log = LoggerFactory.getLogger(FlowEventHandler.class);
+
+ private FlowManager flowManager; // The Flow Manager to use
+ private IDatagridService datagridService; // The Datagrid Service to use
+ private Topology topology; // The network topology
+ private Map<Long, FlowPath> allFlowPaths = new HashMap<Long, FlowPath>();
+ private Map<Long, FlowEntry> unmatchedFlowEntryAdd =
+ new HashMap<Long, FlowEntry>();
+
+ // The queue with Flow Path and Topology Element updates
+ private BlockingQueue<EventEntry<?>> networkEvents =
+ new LinkedBlockingQueue<EventEntry<?>>();
+
+ // The pending Topology, FlowPath, and FlowEntry events
+ private List<EventEntry<TopologyElement>> topologyEvents =
+ new LinkedList<EventEntry<TopologyElement>>();
+ private List<EventEntry<FlowPath>> flowPathEvents =
+ new LinkedList<EventEntry<FlowPath>>();
+ private List<EventEntry<FlowEntry>> flowEntryEvents =
+ new LinkedList<EventEntry<FlowEntry>>();
+
+ //
+ // Transient state for processing the Flow Paths:
+ // - The new Flow Paths
+ // - The Flow Paths that should be recomputed
+ // - The Flow Paths with modified Flow Entries
+ // - The Flow Entries that were updated
+ //
+ private List<FlowPath> newFlowPaths = new LinkedList<FlowPath>();
+ private List<FlowPath> recomputeFlowPaths = new LinkedList<FlowPath>();
+ private List<FlowPath> modifiedFlowPaths = new LinkedList<FlowPath>();
+ private List<FlowPathEntryPair> updatedFlowEntries =
+ new LinkedList<FlowPathEntryPair>();
+ private List<FlowPathEntryPair> unmatchedDeleteFlowEntries =
+ new LinkedList<FlowPathEntryPair>();
+
+
+ /**
+ * Constructor for a given Flow Manager and Datagrid Service.
+ *
+ * @param flowManager the Flow Manager to use.
+ * @param datagridService the Datagrid Service to use.
+ */
+ FlowEventHandler(FlowManager flowManager,
+ IDatagridService datagridService) {
+ this.flowManager = flowManager;
+ this.datagridService = datagridService;
+ this.topology = new Topology();
+ }
+
+ /**
+ * Get the network topology.
+ *
+ * @return the network topology.
+ */
+ protected Topology getTopology() { return this.topology; }
+
+ /**
+ * Startup processing.
+ */
+ private void startup() {
+ //
+ // Obtain the initial Topology state
+ //
+ Collection<TopologyElement> topologyElements =
+ datagridService.getAllTopologyElements();
+ for (TopologyElement topologyElement : topologyElements) {
+ EventEntry<TopologyElement> eventEntry =
+ new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
+ topologyEvents.add(eventEntry);
+ }
+ //
+ // Obtain the initial Flow Path state
+ //
+ Collection<FlowPath> flowPaths = datagridService.getAllFlows();
+ for (FlowPath flowPath : flowPaths) {
+ EventEntry<FlowPath> eventEntry =
+ new EventEntry<FlowPath>(EventEntry.Type.ENTRY_ADD, flowPath);
+ flowPathEvents.add(eventEntry);
+ }
+ //
+ // Obtain the initial FlowEntry state
+ //
+ Collection<FlowEntry> flowEntries = datagridService.getAllFlowEntries();
+ for (FlowEntry flowEntry : flowEntries) {
+ EventEntry<FlowEntry> eventEntry =
+ new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
+ flowEntryEvents.add(eventEntry);
+ }
+
+ // Process the initial events (if any)
+ processEvents();
+ }
+
+ /**
+ * Run the thread.
+ */
+ @Override
+ public void run() {
+ startup();
+
+ //
+ // The main loop
+ //
+ Collection<EventEntry<?>> collection = new LinkedList<EventEntry<?>>();
+ try {
+ while (true) {
+ EventEntry<?> eventEntry = networkEvents.take();
+ collection.add(eventEntry);
+ networkEvents.drainTo(collection);
+
+ //
+ // Demultiplex all events:
+ // - EventEntry<TopologyElement>
+ // - EventEntry<FlowPath>
+ // - EventEntry<FlowEntry>
+ //
+ for (EventEntry<?> event : collection) {
+ if (event.eventData() instanceof TopologyElement) {
+ EventEntry<TopologyElement> topologyEventEntry =
+ (EventEntry<TopologyElement>)event;
+ topologyEvents.add(topologyEventEntry);
+ } else if (event.eventData() instanceof FlowPath) {
+ EventEntry<FlowPath> flowPathEventEntry =
+ (EventEntry<FlowPath>)event;
+ flowPathEvents.add(flowPathEventEntry);
+ } else if (event.eventData() instanceof FlowEntry) {
+ EventEntry<FlowEntry> flowEntryEventEntry =
+ (EventEntry<FlowEntry>)event;
+ flowEntryEvents.add(flowEntryEventEntry);
+ }
+ }
+ collection.clear();
+
+ // Process the events (if any)
+ processEvents();
+ }
+ } catch (Exception exception) {
+ log.debug("Exception processing Network Events: ", exception);
+ }
+ }
+
+ /**
+ * Process the events (if any)
+ */
+ private void processEvents() {
+ List<FlowPathEntryPair> modifiedFlowEntries;
+
+ if (topologyEvents.isEmpty() && flowPathEvents.isEmpty() &&
+ flowEntryEvents.isEmpty()) {
+ return; // Nothing to do
+ }
+
+ processFlowPathEvents();
+ processTopologyEvents();
+ //
+ // Add all new Flows: should be done after processing the Flow Path
+ // and Topology events.
+ //
+ for (FlowPath flowPath : newFlowPaths) {
+ allFlowPaths.put(flowPath.flowId().value(), flowPath);
+ }
+
+ processFlowEntryEvents();
+
+ // Recompute all affected Flow Paths and keep only the modified
+ for (FlowPath flowPath : recomputeFlowPaths) {
+ if (recomputeFlowPath(flowPath))
+ modifiedFlowPaths.add(flowPath);
+ }
+
+ modifiedFlowEntries = extractModifiedFlowEntries(modifiedFlowPaths);
+
+ // Assign missing Flow Entry IDs
+ assignFlowEntryId(modifiedFlowEntries);
+
+ //
+ // Push the modified Flow Entries to switches, datagrid and database
+ //
+ flowManager.pushModifiedFlowEntriesToSwitches(modifiedFlowEntries);
+ flowManager.pushModifiedFlowEntriesToDatagrid(modifiedFlowEntries);
+ flowManager.pushModifiedFlowEntriesToDatabase(modifiedFlowEntries);
+ flowManager.pushModifiedFlowEntriesToDatabase(updatedFlowEntries);
+ flowManager.pushModifiedFlowEntriesToDatabase(unmatchedDeleteFlowEntries);
+
+ //
+ // Remove Flow Entries that were deleted
+ //
+ for (FlowPath flowPath : modifiedFlowPaths)
+ flowPath.dataPath().removeDeletedFlowEntries();
+
+ // Cleanup
+ topologyEvents.clear();
+ flowPathEvents.clear();
+ flowEntryEvents.clear();
+ //
+ newFlowPaths.clear();
+ recomputeFlowPaths.clear();
+ modifiedFlowPaths.clear();
+ updatedFlowEntries.clear();
+ unmatchedDeleteFlowEntries.clear();
+ }
+
+ /**
+ * Extract the modified Flow Entries.
+ */
+ private List<FlowPathEntryPair> extractModifiedFlowEntries(
+ List<FlowPath> modifiedFlowPaths) {
+ List<FlowPathEntryPair> modifiedFlowEntries =
+ new LinkedList<FlowPathEntryPair>();
+
+ // Extract only the modified Flow Entries
+ for (FlowPath flowPath : modifiedFlowPaths) {
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() ==
+ FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED) {
+ FlowPathEntryPair flowPair =
+ new FlowPathEntryPair(flowPath, flowEntry);
+ modifiedFlowEntries.add(flowPair);
+ }
+ }
+ }
+ return modifiedFlowEntries;
+ }
+
+ /**
+ * Assign the Flow Entry ID as needed.
+ */
+ private void assignFlowEntryId(List<FlowPathEntryPair> modifiedFlowEntries) {
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ Map<Long, IOFSwitch> mySwitches = flowManager.getMySwitches();
+
+ //
+ // Assign the Flow Entry ID only for Flow Entries for my switches
+ //
+ for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+ FlowEntry flowEntry = flowPair.flowEntry;
+ // Update the Flow Entries only for my switches
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+ if (mySwitch == null)
+ continue;
+ if (! flowEntry.isValidFlowEntryId()) {
+ long id = flowManager.getNextFlowEntryId();
+ flowEntry.setFlowEntryId(new FlowEntryId(id));
+ }
+ }
+ }
+
+ /**
+ * Process the Flow Path events.
+ */
+ private void processFlowPathEvents() {
+ //
+ // Process all Flow Path events and update the appropriate state
+ //
+ for (EventEntry<FlowPath> eventEntry : flowPathEvents) {
+ FlowPath flowPath = eventEntry.eventData();
+
+ log.debug("Flow Event: {} {}", eventEntry.eventType(),
+ flowPath.toString());
+
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD: {
+ //
+ // Add a new Flow Path
+ //
+ if (allFlowPaths.get(flowPath.flowId().value()) != null) {
+ //
+ // TODO: What to do if the Flow Path already exists?
+ // Remove and then re-add it, or merge the info?
+ // For now, we don't have to do anything.
+ //
+ break;
+ }
+
+ switch (flowPath.flowPathType()) {
+ case FP_TYPE_SHORTEST_PATH:
+ //
+ // Reset the Data Path, in case it was set already, because
+ // we are going to recompute it anyway.
+ //
+ flowPath.flowEntries().clear();
+ recomputeFlowPaths.add(flowPath);
+ break;
+ case FP_TYPE_EXPLICIT_PATH:
+ //
+ // Mark all Flow Entries for installation in the switches.
+ //
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+ modifiedFlowPaths.add(flowPath);
+ break;
+ }
+ newFlowPaths.add(flowPath);
+
+ break;
+ }
+
+ case ENTRY_REMOVE: {
+ //
+ // Remove an existing Flow Path.
+ //
+ // Find the Flow Path, and mark the Flow and its Flow Entries
+ // for deletion.
+ //
+ FlowPath existingFlowPath =
+ allFlowPaths.get(flowPath.flowId().value());
+ if (existingFlowPath == null)
+ continue; // Nothing to do
+
+ existingFlowPath.setFlowPathUserState(FlowPathUserState.FP_USER_DELETE);
+ for (FlowEntry flowEntry : existingFlowPath.flowEntries()) {
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ }
+
+ allFlowPaths.remove(existingFlowPath.flowId().value());
+ modifiedFlowPaths.add(existingFlowPath);
+
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Process the Topology events.
+ */
+ private void processTopologyEvents() {
+ //
+ // Process all Topology events and update the appropriate state
+ //
+ boolean isTopologyModified = false;
+ for (EventEntry<TopologyElement> eventEntry : topologyEvents) {
+ TopologyElement topologyElement = eventEntry.eventData();
+
+ log.debug("Topology Event: {} {}", eventEntry.eventType(),
+ topologyElement.toString());
+
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD:
+ isTopologyModified |= topology.addTopologyElement(topologyElement);
+ break;
+ case ENTRY_REMOVE:
+ isTopologyModified |= topology.removeTopologyElement(topologyElement);
+ break;
+ }
+ }
+ if (isTopologyModified) {
+ // TODO: For now, if the topology changes, we recompute all Flows
+ recomputeFlowPaths.addAll(allFlowPaths.values());
+ }
+ }
+
+ /**
+ * Process the Flow Entry events.
+ */
+ private void processFlowEntryEvents() {
+ FlowPathEntryPair flowPair;
+ FlowPath flowPath;
+ FlowEntry updatedFlowEntry;
+
+ //
+ // Update Flow Entries with previously unmatched Flow Entry updates
+ //
+ if (! unmatchedFlowEntryAdd.isEmpty()) {
+ Map<Long, FlowEntry> remainingUpdates = new HashMap<Long, FlowEntry>();
+ for (FlowEntry flowEntry : unmatchedFlowEntryAdd.values()) {
+ flowPath = allFlowPaths.get(flowEntry.flowId().value());
+ if (flowPath == null)
+ continue;
+ updatedFlowEntry = updateFlowEntryAdd(flowPath, flowEntry);
+ if (updatedFlowEntry == null) {
+ remainingUpdates.put(flowEntry.flowEntryId().value(),
+ flowEntry);
+ continue;
+ }
+ flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+ updatedFlowEntries.add(flowPair);
+ }
+ unmatchedFlowEntryAdd = remainingUpdates;
+ }
+
+ //
+ // Process all Flow Entry events and update the appropriate state
+ //
+ for (EventEntry<FlowEntry> eventEntry : flowEntryEvents) {
+ FlowEntry flowEntry = eventEntry.eventData();
+
+ log.debug("Flow Entry Event: {} {}", eventEntry.eventType(),
+ flowEntry.toString());
+
+ if ((! flowEntry.isValidFlowId()) ||
+ (! flowEntry.isValidFlowEntryId())) {
+ continue;
+ }
+
+ switch (eventEntry.eventType()) {
+ case ENTRY_ADD:
+ flowPath = allFlowPaths.get(flowEntry.flowId().value());
+ if (flowPath == null) {
+ // Flow Path not found: keep the entry for later matching
+ unmatchedFlowEntryAdd.put(flowEntry.flowEntryId().value(),
+ flowEntry);
+ break;
+ }
+ updatedFlowEntry = updateFlowEntryAdd(flowPath, flowEntry);
+ if (updatedFlowEntry == null) {
+ // Flow Entry not found: keep the entry for later matching
+ unmatchedFlowEntryAdd.put(flowEntry.flowEntryId().value(),
+ flowEntry);
+ break;
+ }
+ // Add the updated entry to the list of updated Flow Entries
+ flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+ updatedFlowEntries.add(flowPair);
+ break;
+
+ case ENTRY_REMOVE:
+ flowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+ if (unmatchedFlowEntryAdd.remove(flowEntry.flowEntryId().value()) != null) {
+ continue; // Match found
+ }
+
+ flowPath = allFlowPaths.get(flowEntry.flowId().value());
+ if (flowPath == null) {
+ // Flow Path not found: ignore the update
+ break;
+ }
+ updatedFlowEntry = updateFlowEntryRemove(flowPath, flowEntry);
+ if (updatedFlowEntry == null) {
+ // Flow Entry not found: add to list of deleted entries
+ flowPair = new FlowPathEntryPair(flowPath, flowEntry);
+ unmatchedDeleteFlowEntries.add(flowPair);
+ break;
+ }
+ flowPair = new FlowPathEntryPair(flowPath, updatedFlowEntry);
+ updatedFlowEntries.add(flowPair);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Update a Flow Entry because of an external ENTRY_ADD event.
+ *
+ * @param flowPath the FlowPath for the Flow Entry to update.
+ * @param flowEntry the FlowEntry with the new state.
+ * @return the updated Flow Entry if found, otherwise null.
+ */
+ private FlowEntry updateFlowEntryAdd(FlowPath flowPath,
+ FlowEntry flowEntry) {
+ //
+ // Iterate over all Flow Entries and find a match.
+ //
+ for (FlowEntry localFlowEntry : flowPath.flowEntries()) {
+ if (! TopologyManager.isSameFlowEntryDataPath(localFlowEntry,
+ flowEntry)) {
+ continue;
+ }
+
+ //
+ // Local Flow Entry match found
+ //
+ if (localFlowEntry.isValidFlowEntryId()) {
+ if (localFlowEntry.flowEntryId().value() !=
+ flowEntry.flowEntryId().value()) {
+ //
+ // Find a local Flow Entry, but the Flow Entry ID doesn't
+ // match. Keep looking.
+ //
+ continue;
+ }
+ } else {
+ // Update the Flow Entry ID
+ FlowEntryId flowEntryId =
+ new FlowEntryId(flowEntry.flowEntryId().value());
+ localFlowEntry.setFlowEntryId(flowEntryId);
+ }
+
+ //
+ // Update the local Flow Entry.
+ //
+ localFlowEntry.setFlowEntryUserState(flowEntry.flowEntryUserState());
+ localFlowEntry.setFlowEntrySwitchState(flowEntry.flowEntrySwitchState());
+ return localFlowEntry;
+ }
+
+ return null; // Entry not found
+ }
+
+ /**
+ * Update a Flow Entry because of an external ENTRY_REMOVE event.
+ *
+ * @param flowPath the FlowPath for the Flow Entry to update.
+ * @param flowEntry the FlowEntry with the new state.
+ * @return the updated Flow Entry if found, otherwise null.
+ */
+ private FlowEntry updateFlowEntryRemove(FlowPath flowPath,
+ FlowEntry flowEntry) {
+ //
+ // Iterate over all Flow Entries and find a match based on
+ // the Flow Entry ID.
+ //
+ for (FlowEntry localFlowEntry : flowPath.flowEntries()) {
+ if (! localFlowEntry.isValidFlowEntryId())
+ continue;
+ if (localFlowEntry.flowEntryId().value() !=
+ flowEntry.flowEntryId().value()) {
+ continue;
+ }
+ //
+ // Update the local Flow Entry.
+ //
+ localFlowEntry.setFlowEntryUserState(flowEntry.flowEntryUserState());
+ localFlowEntry.setFlowEntrySwitchState(flowEntry.flowEntrySwitchState());
+ return localFlowEntry;
+ }
+
+ return null; // Entry not found
+ }
+
+ /**
+ * Recompute a Flow Path.
+ *
+ * @param flowPath the Flow Path to recompute.
+ * @return true if the recomputed Flow Path has changed, otherwise false.
+ */
+ private boolean recomputeFlowPath(FlowPath flowPath) {
+ boolean hasChanged = false;
+
+ //
+ // Test whether the Flow Path needs to be recomputed
+ //
+ switch (flowPath.flowPathType()) {
+ case FP_TYPE_UNKNOWN:
+ return false; // Can't recompute on Unknown FlowType
+ case FP_TYPE_SHORTEST_PATH:
+ break;
+ case FP_TYPE_EXPLICIT_PATH:
+ return false; // An explicit path never changes
+ }
+
+ DataPath oldDataPath = flowPath.dataPath();
+
+ // Compute the new path
+ DataPath newDataPath = TopologyManager.computeNetworkPath(topology,
+ flowPath);
+ if (newDataPath == null) {
+ // We need the DataPath to compare the paths
+ newDataPath = new DataPath();
+ }
+ newDataPath.applyFlowPathFlags(flowPath.flowPathFlags());
+
+ //
+ // Test whether the new path is same
+ //
+ if (oldDataPath.flowEntries().size() !=
+ newDataPath.flowEntries().size()) {
+ hasChanged = true;
+ } else {
+ Iterator<FlowEntry> oldIter = oldDataPath.flowEntries().iterator();
+ Iterator<FlowEntry> newIter = newDataPath.flowEntries().iterator();
+ while (oldIter.hasNext() && newIter.hasNext()) {
+ FlowEntry oldFlowEntry = oldIter.next();
+ FlowEntry newFlowEntry = newIter.next();
+ if (! TopologyManager.isSameFlowEntryDataPath(oldFlowEntry,
+ newFlowEntry)) {
+ hasChanged = true;
+ break;
+ }
+ }
+ }
+ if (! hasChanged)
+ return hasChanged;
+
+ //
+ // Merge the changes in the path:
+ // - If a Flow Entry for a switch is in the old data path, but not
+ // in the new data path, then mark it for deletion.
+ // - If a Flow Entry for a switch is in the new data path, but not
+ // in the old data path, then mark it for addition.
+ // - If a Flow Entry for a switch is in both the old and the new
+ // data path, but it has changed, e.g., the incoming and/or outgoing
+ // port(s), then mark the old Flow Entry for deletion, and mark
+ // the new Flow Entry for addition.
+ // - If a Flow Entry for a switch is in both the old and the new
+ // data path, and it hasn't changed, then just keep it.
+ //
+ // NOTE: We use the Switch DPID of each entry to match the entries
+ //
+ Map<Long, FlowEntry> oldFlowEntriesMap = new HashMap<Long, FlowEntry>();
+ Map<Long, FlowEntry> newFlowEntriesMap = new HashMap<Long, FlowEntry>();
+ ArrayList<FlowEntry> finalFlowEntries = new ArrayList<FlowEntry>();
+ List<FlowEntry> deletedFlowEntries = new LinkedList<FlowEntry>();
+
+ // Prepare maps with the Flow Entries, so they are fast to lookup
+ for (FlowEntry flowEntry : oldDataPath.flowEntries())
+ oldFlowEntriesMap.put(flowEntry.dpid().value(), flowEntry);
+ for (FlowEntry flowEntry : newDataPath.flowEntries())
+ newFlowEntriesMap.put(flowEntry.dpid().value(), flowEntry);
+
+ //
+ // Find the old Flow Entries that should be deleted
+ //
+ for (FlowEntry oldFlowEntry : oldDataPath.flowEntries()) {
+ FlowEntry newFlowEntry =
+ newFlowEntriesMap.get(oldFlowEntry.dpid().value());
+ if (newFlowEntry == null) {
+ // The old Flow Entry should be deleted: not on the path
+ oldFlowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+ oldFlowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ deletedFlowEntries.add(oldFlowEntry);
+ }
+ }
+
+ //
+ // Find the new Flow Entries that should be added or updated
+ //
+ int idx = 0;
+ for (FlowEntry newFlowEntry : newDataPath.flowEntries()) {
+ FlowEntry oldFlowEntry =
+ oldFlowEntriesMap.get(newFlowEntry.dpid().value());
+
+ if ((oldFlowEntry != null) &&
+ TopologyManager.isSameFlowEntryDataPath(oldFlowEntry,
+ newFlowEntry)) {
+ //
+ // Both Flow Entries are same
+ //
+ finalFlowEntries.add(oldFlowEntry);
+ idx++;
+ continue;
+ }
+
+ if (oldFlowEntry != null) {
+ //
+ // The old Flow Entry should be deleted: path diverges
+ //
+ oldFlowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_DELETE);
+ oldFlowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ deletedFlowEntries.add(oldFlowEntry);
+ }
+
+ //
+ // Add the new Flow Entry
+ //
+ //
+ // NOTE: Assign only the Flow ID.
+ // The Flow Entry ID is assigned later only for the Flow Entries
+ // this instance is responsible for.
+ //
+ newFlowEntry.setFlowId(new FlowId(flowPath.flowId().value()));
+
+ // Set the incoming port matching
+ FlowEntryMatch flowEntryMatch = new FlowEntryMatch();
+ newFlowEntry.setFlowEntryMatch(flowEntryMatch);
+ flowEntryMatch.enableInPort(newFlowEntry.inPort());
+
+ //
+ // Set the actions:
+ // If the first Flow Entry, copy the Flow Path actions to it.
+ //
+ FlowEntryActions flowEntryActions = newFlowEntry.flowEntryActions();
+ if ((idx == 0) && (flowPath.flowEntryActions() != null)) {
+ FlowEntryActions flowActions =
+ new FlowEntryActions(flowPath.flowEntryActions());
+ for (FlowEntryAction action : flowActions.actions())
+ flowEntryActions.addAction(action);
+ }
+ idx++;
+
+ //
+ // Add the outgoing port output action
+ //
+ FlowEntryAction flowEntryAction = new FlowEntryAction();
+ flowEntryAction.setActionOutput(newFlowEntry.outPort());
+ flowEntryActions.addAction(flowEntryAction);
+
+ //
+ // Set the state of the new Flow Entry
+ //
+ newFlowEntry.setFlowEntryUserState(FlowEntryUserState.FE_USER_ADD);
+ newFlowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
+ finalFlowEntries.add(newFlowEntry);
+ }
+
+ //
+ // Replace the old Flow Entries with the new Flow Entries.
+ // Note that the Flow Entries that will be deleted are added at
+ // the end.
+ //
+ finalFlowEntries.addAll(deletedFlowEntries);
+ flowPath.dataPath().setFlowEntries(finalFlowEntries);
+
+ return hasChanged;
+ }
+
+ /**
+ * Receive a notification that a Flow is added.
+ *
+ * @param flowPath the Flow that is added.
+ */
+ @Override
+ public void notificationRecvFlowAdded(FlowPath flowPath) {
+ EventEntry<FlowPath> eventEntry =
+ new EventEntry<FlowPath>(EventEntry.Type.ENTRY_ADD, flowPath);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Flow is removed.
+ *
+ * @param flowPath the Flow that is removed.
+ */
+ @Override
+ public void notificationRecvFlowRemoved(FlowPath flowPath) {
+ EventEntry<FlowPath> eventEntry =
+ new EventEntry<FlowPath>(EventEntry.Type.ENTRY_REMOVE, flowPath);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Flow is updated.
+ *
+ * @param flowPath the Flow that is updated.
+ */
+ @Override
+ public void notificationRecvFlowUpdated(FlowPath flowPath) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<FlowPath> eventEntry =
+ new EventEntry<FlowPath>(EventEntry.Type.ENTRY_ADD, flowPath);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntry is added.
+ *
+ * @param flowEntry the FlowEntry that is added.
+ */
+ @Override
+ public void notificationRecvFlowEntryAdded(FlowEntry flowEntry) {
+ EventEntry<FlowEntry> eventEntry =
+ new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntry is removed.
+ *
+ * @param flowEntry the FlowEntry that is removed.
+ */
+ @Override
+ public void notificationRecvFlowEntryRemoved(FlowEntry flowEntry) {
+ EventEntry<FlowEntry> eventEntry =
+ new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_REMOVE, flowEntry);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a FlowEntry is updated.
+ *
+ * @param flowEntry the FlowEntry that is updated.
+ */
+ @Override
+ public void notificationRecvFlowEntryUpdated(FlowEntry flowEntry) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<FlowEntry> eventEntry =
+ new EventEntry<FlowEntry>(EventEntry.Type.ENTRY_ADD, flowEntry);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Topology Element is added.
+ *
+ * @param topologyElement the Topology Element that is added.
+ */
+ @Override
+ public void notificationRecvTopologyElementAdded(TopologyElement topologyElement) {
+ EventEntry<TopologyElement> eventEntry =
+ new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Topology Element is removed.
+ *
+ * @param topologyElement the Topology Element that is removed.
+ */
+ @Override
+ public void notificationRecvTopologyElementRemoved(TopologyElement topologyElement) {
+ EventEntry<TopologyElement> eventEntry =
+ new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_REMOVE, topologyElement);
+ networkEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that a Topology Element is updated.
+ *
+ * @param topologyElement the Topology Element that is updated.
+ */
+ @Override
+ public void notificationRecvTopologyElementUpdated(TopologyElement topologyElement) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ EventEntry<TopologyElement> eventEntry =
+ new EventEntry<TopologyElement>(EventEntry.Type.ENTRY_ADD, topologyElement);
+ networkEvents.add(eventEntry);
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index cd6f5eb..db1e588 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -1,20 +1,14 @@
package net.onrc.onos.ofcontroller.flowmanager;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Executors;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -25,43 +19,20 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
-import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
import net.onrc.onos.graph.DBOperation;
import net.onrc.onos.graph.GraphDBManager;
+import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.ofcontroller.core.INetMapStorage;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IPortObject;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.ISwitchObject;
-import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoRouteService;
+import net.onrc.onos.ofcontroller.floodlightlistener.INetworkGraphService;
import net.onrc.onos.ofcontroller.flowmanager.web.FlowWebRoutable;
-import net.onrc.onos.ofcontroller.routing.TopoRouteService;
-import net.onrc.onos.ofcontroller.util.CallerId;
-import net.onrc.onos.ofcontroller.util.DataPath;
-import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
-import net.onrc.onos.ofcontroller.util.Dpid;
-import net.onrc.onos.ofcontroller.util.FlowEntry;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction;
-import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
-import net.onrc.onos.ofcontroller.util.FlowEntryActions;
-import net.onrc.onos.ofcontroller.util.FlowEntryId;
-import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
-import net.onrc.onos.ofcontroller.util.FlowEntrySwitchState;
-import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
-import net.onrc.onos.ofcontroller.util.FlowId;
-import net.onrc.onos.ofcontroller.util.FlowPath;
-import net.onrc.onos.ofcontroller.util.FlowPathFlags;
-import net.onrc.onos.ofcontroller.util.IPv4Net;
-import net.onrc.onos.ofcontroller.util.Port;
-import net.onrc.onos.ofcontroller.util.SwitchPort;
+import net.onrc.onos.ofcontroller.flowprogrammer.IFlowPusherService;
+import net.onrc.onos.ofcontroller.topology.Topology;
+import net.onrc.onos.ofcontroller.util.*;
-import org.openflow.protocol.OFFlowMod;
-import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFPacketOut;
-import org.openflow.protocol.OFPort;
import org.openflow.protocol.OFType;
-import org.openflow.protocol.action.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,362 +40,40 @@
* Flow Manager class for handling the network flows.
*/
public class FlowManager implements IFloodlightModule, IFlowService, INetMapStorage {
+ // flag to use FlowPusher instead of FlowSwitchOperation/MessageDamper
+ private final static boolean enableFlowPusher = false;
protected DBOperation op;
- protected IRestApiService restApi;
protected volatile IFloodlightProviderService floodlightProvider;
- protected volatile ITopoRouteService topoRouteService;
+ protected volatile IDatagridService datagridService;
+ protected IRestApiService restApi;
protected FloodlightModuleContext context;
+ protected FlowEventHandler flowEventHandler;
+ protected IFlowPusherService pusher;
+
protected OFMessageDamper messageDamper;
-
+
//
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
//
protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
- public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
- public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
- public static final short PRIORITY_DEFAULT = 100;
// Flow Entry ID generation state
private static Random randomGenerator = new Random();
private static int nextFlowEntryIdPrefix = 0;
private static int nextFlowEntryIdSuffix = 0;
- private static long nextFlowEntryId = 0;
-
- // State for measurement purpose
- private static long measurementFlowId = 100000;
- private static String measurementFlowIdStr = "0x186a0"; // 100000
- private long modifiedMeasurementFlowTime = 0;
- //
/** The logger. */
- private static Logger log = LoggerFactory.getLogger(FlowManager.class);
+ private final static Logger log = LoggerFactory.getLogger(FlowManager.class);
- // The periodic task(s)
- private ScheduledExecutorService mapReaderScheduler;
- private ScheduledExecutorService shortestPathReconcileScheduler;
-
- /**
- * Periodic task for reading the Flow Entries and pushing changes
- * into the switches.
- */
- final Runnable mapReader = new Runnable() {
- public void run() {
- try {
- runImpl();
- } catch (Exception e) {
- log.debug("Exception processing All Flow Entries from the Network MAP: ", e);
- op.rollback();
- return;
- }
- }
-
- private void runImpl() {
- long startTime = System.nanoTime();
- int counterAllFlowEntries = 0;
- int counterMyNotUpdatedFlowEntries = 0;
-
- if (floodlightProvider == null) {
- log.debug("FloodlightProvider service not found!");
- return;
- }
- Map<Long, IOFSwitch> mySwitches =
- floodlightProvider.getSwitches();
- if (mySwitches.isEmpty()) {
- log.trace("No switches controlled");
- return;
- }
- LinkedList<IFlowEntry> addFlowEntries =
- new LinkedList<IFlowEntry>();
- LinkedList<IFlowEntry> deleteFlowEntries =
- new LinkedList<IFlowEntry>();
-
- //
- // Fetch all Flow Entries which need to be updated and select only my Flow Entries
- // that need to be updated into the switches.
- //
- boolean processed_measurement_flow = false;
- Iterable<IFlowEntry> allFlowEntries =
- op.getAllSwitchNotUpdatedFlowEntries();
- for (IFlowEntry flowEntryObj : allFlowEntries) {
- counterAllFlowEntries++;
-
- String dpidStr = flowEntryObj.getSwitchDpid();
- if (dpidStr == null)
- continue;
- Dpid dpid = new Dpid(dpidStr);
- IOFSwitch mySwitch = mySwitches.get(dpid.value());
- if (mySwitch == null)
- continue; // Ignore the entry: not my switch
-
- IFlowPath flowObj =
- op.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null)
- continue; // Should NOT happen
- if (flowObj.getFlowId() == null)
- continue; // Invalid entry
-
- //
- // NOTE: For now we process the DELETE before the ADD
- // to cover the more common scenario.
- // TODO: This is error prone and needs to be fixed!
- //
- String userState = flowEntryObj.getUserState();
- if (userState == null)
- continue;
- if (userState.equals("FE_USER_DELETE")) {
- // An entry that needs to be deleted.
- deleteFlowEntries.add(flowEntryObj);
- installFlowEntry(mySwitch, flowObj, flowEntryObj);
- } else {
- addFlowEntries.add(flowEntryObj);
- }
- counterMyNotUpdatedFlowEntries++;
- // Code for measurement purpose
- // TODO: Commented-out for now
- /*
- {
- if (flowObj.getFlowId().equals(measurementFlowIdStr)) {
- processed_measurement_flow = true;
- }
- }
- */
- }
-
- //
- // Process the Flow Entries that need to be added
- //
- for (IFlowEntry flowEntryObj : addFlowEntries) {
- IFlowPath flowObj =
- op.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null)
- continue; // Should NOT happen
- if (flowObj.getFlowId() == null)
- continue; // Invalid entry
-
- Dpid dpid = new Dpid(flowEntryObj.getSwitchDpid());
- IOFSwitch mySwitch = mySwitches.get(dpid.value());
- if (mySwitch == null)
- continue; // Shouldn't happen
- installFlowEntry(mySwitch, flowObj, flowEntryObj);
- }
-
- //
- // Delete all Flow Entries marked for deletion from the
- // Network MAP.
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and delete the Flow Entries after the
- // Barrier message is received.
- //
- while (! deleteFlowEntries.isEmpty()) {
- IFlowEntry flowEntryObj = deleteFlowEntries.poll();
- IFlowPath flowObj =
- op.getFlowPathByFlowEntry(flowEntryObj);
- if (flowObj == null) {
- log.debug("Did not find FlowPath to be deleted");
- continue;
- }
- flowObj.removeFlowEntry(flowEntryObj);
- op.removeFlowEntry(flowEntryObj);
- }
-
- op.commit();
-
- if (processed_measurement_flow) {
- long estimatedTime =
- System.nanoTime() - modifiedMeasurementFlowTime;
- String logMsg = "MEASUREMENT: Pushed Flow delay: " +
- (double)estimatedTime / 1000000000 + " sec";
- log.debug(logMsg);
- }
-
- long estimatedTime = System.nanoTime() - startTime;
- double rate = 0.0;
- if (estimatedTime > 0)
- rate = ((double)counterAllFlowEntries * 1000000000) / estimatedTime;
- String logMsg = "MEASUREMENT: Processed AllFlowEntries: " +
- counterAllFlowEntries + " MyNotUpdatedFlowEntries: " +
- counterMyNotUpdatedFlowEntries + " in " +
- (double)estimatedTime / 1000000000 + " sec: " +
- rate + " paths/s";
- log.debug(logMsg);
- }
- };
-
- /**
- * Periodic task for reading the Flow Paths and recomputing the
- * shortest paths.
- */
- final Runnable shortestPathReconcile = new Runnable() {
- public void run() {
- try {
- runImpl();
- } catch (Exception e) {
- log.debug("Exception processing All Flows from the Network MAP: ", e);
- op.rollback();
- return;
- }
- }
-
- private void runImpl() {
- long startTime = System.nanoTime();
- int counterAllFlowPaths = 0;
- int counterMyFlowPaths = 0;
-
- if (floodlightProvider == null) {
- log.debug("FloodlightProvider service not found!");
- return;
- }
- Map<Long, IOFSwitch> mySwitches =
- floodlightProvider.getSwitches();
- if (mySwitches.isEmpty()) {
- log.trace("No switches controlled");
- return;
- }
- LinkedList<IFlowPath> deleteFlows = new LinkedList<IFlowPath>();
-
- boolean processed_measurement_flow = false;
-
- //
- // Fetch and recompute the Shortest Path for those
- // Flow Paths this controller is responsible for.
- //
- Map<Long, ?> shortestPathTopo =
- topoRouteService.prepareShortestPathTopo();
- Iterable<IFlowPath> allFlowPaths = op.getAllFlowPaths();
- for (IFlowPath flowPathObj : allFlowPaths) {
- counterAllFlowPaths++;
- if (flowPathObj == null)
- continue;
-
- String srcDpidStr = flowPathObj.getSrcSwitch();
- if (srcDpidStr == null)
- continue;
- Dpid srcDpid = new Dpid(srcDpidStr);
- //
- // Use the source DPID as a heuristic to decide
- // which controller is responsible for maintaining the
- // shortest path.
- // NOTE: This heuristic is error-prone: if the switch
- // goes away and no controller is responsible for that
- // switch, then the original Flow Path is not cleaned-up
- //
- IOFSwitch mySwitch = mySwitches.get(srcDpid.value());
- if (mySwitch == null)
- continue; // Ignore: not my responsibility
-
- // Test the Data Path Summary string
- String dataPathSummaryStr = flowPathObj.getDataPathSummary();
- if (dataPathSummaryStr == null)
- continue; // Could be invalid entry?
- if (dataPathSummaryStr.isEmpty())
- continue; // No need to maintain this flow
-
- //
- // Test whether we need to complete the Flow cleanup,
- // if the Flow has been deleted by the user.
- //
- String flowUserState = flowPathObj.getUserState();
- if ((flowUserState != null)
- && flowUserState.equals("FE_USER_DELETE")) {
- Iterable<IFlowEntry> flowEntries = flowPathObj.getFlowEntries();
- boolean empty = true; // TODO: an ugly hack
- for (IFlowEntry flowEntryObj : flowEntries) {
- empty = false;
- break;
- }
- if (empty)
- deleteFlows.add(flowPathObj);
- }
-
- // Fetch the fields needed to recompute the shortest path
- Short srcPortShort = flowPathObj.getSrcPort();
- String dstDpidStr = flowPathObj.getDstSwitch();
- Short dstPortShort = flowPathObj.getDstPort();
- Long flowPathFlagsLong = flowPathObj.getFlowPathFlags();
- if ((srcPortShort == null) ||
- (dstDpidStr == null) ||
- (dstPortShort == null) ||
- (flowPathFlagsLong == null)) {
- continue;
- }
-
- Port srcPort = new Port(srcPortShort);
- Dpid dstDpid = new Dpid(dstDpidStr);
- Port dstPort = new Port(dstPortShort);
- SwitchPort srcSwitchPort = new SwitchPort(srcDpid, srcPort);
- SwitchPort dstSwitchPort = new SwitchPort(dstDpid, dstPort);
- FlowPathFlags flowPathFlags = new FlowPathFlags(flowPathFlagsLong);
-
- counterMyFlowPaths++;
-
- //
- // NOTE: Using here the regular getShortestPath() method
- // won't work here, because that method calls internally
- // "conn.endTx(Transaction.COMMIT)", and that will
- // invalidate all handlers to the Titan database.
- // If we want to experiment with calling here
- // getShortestPath(), we need to refactor that code
- // to avoid closing the transaction.
- //
- DataPath dataPath =
- topoRouteService.getTopoShortestPath(shortestPathTopo,
- srcSwitchPort,
- dstSwitchPort);
- if (dataPath == null) {
- // We need the DataPath to compare the paths
- dataPath = new DataPath();
- dataPath.setSrcPort(srcSwitchPort);
- dataPath.setDstPort(dstSwitchPort);
- }
- dataPath.applyFlowPathFlags(flowPathFlags);
-
- String newDataPathSummaryStr = dataPath.dataPathSummary();
- if (dataPathSummaryStr.equals(newDataPathSummaryStr))
- continue; // Nothing changed
-
- reconcileFlow(flowPathObj, dataPath);
- }
-
- //
- // Delete all leftover Flows marked for deletion from the
- // Network MAP.
- //
- while (! deleteFlows.isEmpty()) {
- IFlowPath flowPathObj = deleteFlows.poll();
- op.removeFlowPath(flowPathObj);
- }
-
- topoRouteService.dropShortestPathTopo(shortestPathTopo);
-
- op.commit();
-
- if (processed_measurement_flow) {
- long estimatedTime =
- System.nanoTime() - modifiedMeasurementFlowTime;
- String logMsg = "MEASUREMENT: Pushed Flow delay: " +
- (double)estimatedTime / 1000000000 + " sec";
- log.debug(logMsg);
- }
-
- long estimatedTime = System.nanoTime() - startTime;
- double rate = 0.0;
- if (estimatedTime > 0)
- rate = ((double)counterAllFlowPaths * 1000000000) / estimatedTime;
- String logMsg = "MEASUREMENT: Processed AllFlowPaths: " +
- counterAllFlowPaths + " MyFlowPaths: " +
- counterMyFlowPaths + " in " +
- (double)estimatedTime / 1000000000 + " sec: " +
- rate + " paths/s";
- log.debug(logMsg);
- }
- };
-
+ // The queue to write Flow Entries to the database
+ private BlockingQueue<FlowPathEntryPair> flowEntriesToDatabaseQueue =
+ new LinkedBlockingQueue<FlowPathEntryPair>();
+ FlowDatabaseWriter flowDatabaseWriter;
/**
* Initialize the Flow Manager.
@@ -449,7 +98,9 @@
*/
@Override
public void close() {
- op.close();
+ datagridService.deregisterFlowEventHandlerService(flowEventHandler);
+ dbHandlerApi.close();
+ dbHandlerInner.close();
}
/**
@@ -474,9 +125,9 @@
public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
Map<Class<? extends IFloodlightService>,
- IFloodlightService> m =
- new HashMap<Class<? extends IFloodlightService>,
- IFloodlightService>();
+ IFloodlightService> m =
+ new HashMap<Class<? extends IFloodlightService>,
+ IFloodlightService>();
m.put(IFlowService.class, this);
return m;
}
@@ -488,10 +139,12 @@
*/
@Override
public Collection<Class<? extends IFloodlightService>>
- getModuleDependencies() {
+ getModuleDependencies() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFloodlightProviderService.class);
+ l.add(INetworkGraphService.class);
+ l.add(IDatagridService.class);
l.add(IRestApiService.class);
return l;
}
@@ -506,17 +159,18 @@
throws FloodlightModuleException {
this.context = context;
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
+ datagridService = context.getServiceImpl(IDatagridService.class);
restApi = context.getServiceImpl(IRestApiService.class);
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
- EnumSet.of(OFType.FLOW_MOD),
- OFMESSAGE_DAMPER_TIMEOUT);
- // TODO: An ugly hack!
- String conf = "/tmp/cassandra.titan";
- this.init("titan", conf);
-
- mapReaderScheduler = Executors.newScheduledThreadPool(1);
- shortestPathReconcileScheduler = Executors.newScheduledThreadPool(1);
+ if (enableFlowPusher) {
+ pusher = context.getServiceImpl(IFlowPusherService.class);
+ } else {
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);
+ }
+
+ this.init("","");
}
/**
@@ -524,7 +178,8 @@
*
* @return the next Flow Entry ID to use.
*/
- private synchronized long getNextFlowEntryId() {
+ @Override
+ public synchronized long getNextFlowEntryId() {
//
// Generate the next Flow Entry ID.
// NOTE: For now, the higher 32 bits are random, and
@@ -550,172 +205,55 @@
@Override
public void startUp(FloodlightModuleContext context) {
restApi.addRestletRoutable(new FlowWebRoutable());
-
+
// Initialize the Flow Entry ID generator
nextFlowEntryIdPrefix = randomGenerator.nextInt();
-
- mapReaderScheduler.scheduleAtFixedRate(
- mapReader, 3, 3, TimeUnit.SECONDS);
- shortestPathReconcileScheduler.scheduleAtFixedRate(
- shortestPathReconcile, 3, 3, TimeUnit.SECONDS);
+
+ //
+ // The thread to write to the database
+ //
+ flowDatabaseWriter = new FlowDatabaseWriter(this,
+ flowEntriesToDatabaseQueue);
+ flowDatabaseWriter.start();
+
+ //
+ // The Flow Event Handler thread:
+ // - create
+ // - register with the Datagrid Service
+ // - startup
+ //
+ flowEventHandler = new FlowEventHandler(this, datagridService);
+ datagridService.registerFlowEventHandlerService(flowEventHandler);
+ flowEventHandler.start();
}
/**
* Add a flow.
*
- * Internally, ONOS will automatically register the installer for
- * receiving Flow Path Notifications for that path.
- *
* @param flowPath the Flow Path to install.
* @param flowId the return-by-reference Flow ID as assigned internally.
- * @param dataPathSummaryStr the data path summary string if the added
- * flow will be maintained internally, otherwise null.
* @return true on success, otherwise false.
*/
@Override
- public boolean addFlow(FlowPath flowPath, FlowId flowId,
- String dataPathSummaryStr) {
- /*
- * TODO: Commented-out for now
- if (flowPath.flowId().value() == measurementFlowId) {
- modifiedMeasurementFlowTime = System.nanoTime();
- }
- */
-
- IFlowPath flowObj = null;
- boolean found = false;
- try {
- if ((flowObj = op.searchFlowPath(flowPath.flowId()))
- != null) {
- log.debug("Adding FlowPath with FlowId {}: found existing FlowPath",
- flowPath.flowId().toString());
- found = true;
- } else {
- flowObj = op.newFlowPath();
- log.debug("Adding FlowPath with FlowId {}: creating new FlowPath",
- flowPath.flowId().toString());
+ public boolean addFlow(FlowPath flowPath, FlowId flowId) {
+ //
+ // NOTE: We need to explicitly initialize some of the state,
+ // in case the application didn't do it.
+ //
+ for (FlowEntry flowEntry : flowPath.flowEntries()) {
+ if (flowEntry.flowEntrySwitchState() ==
+ FlowEntrySwitchState.FE_SWITCH_UNKNOWN) {
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_NOT_UPDATED);
}
- } catch (Exception e) {
- // TODO: handle exceptions
- op.rollback();
-
- StringWriter sw = new StringWriter();
- e.printStackTrace(new PrintWriter(sw));
- String stacktrace = sw.toString();
-
- log.error(":addFlow FlowId:{} failed: {}",
- flowPath.flowId().toString(),
- stacktrace);
- }
- if (flowObj == null) {
- log.error(":addFlow FlowId:{} failed: Flow object not created",
- flowPath.flowId().toString());
- op.rollback();
- return false;
+ if (! flowEntry.isValidFlowId())
+ flowEntry.setFlowId(new FlowId(flowPath.flowId().value()));
}
- //
- // Set the Flow key:
- // - flowId
- //
- flowObj.setFlowId(flowPath.flowId().toString());
- flowObj.setType("flow");
-
- //
- // Set the Flow attributes:
- // - flowPath.installerId()
- // - flowPath.flowPathFlags()
- // - flowPath.dataPath().srcPort()
- // - flowPath.dataPath().dstPort()
- // - flowPath.matchSrcMac()
- // - flowPath.matchDstMac()
- // - flowPath.matchEthernetFrameType()
- // - flowPath.matchVlanId()
- // - flowPath.matchVlanPriority()
- // - flowPath.matchSrcIPv4Net()
- // - flowPath.matchDstIPv4Net()
- // - flowPath.matchIpProto()
- // - flowPath.matchIpToS()
- // - flowPath.matchSrcTcpUdpPort()
- // - flowPath.matchDstTcpUdpPort()
- // - flowPath.flowEntryActions()
- //
- flowObj.setInstallerId(flowPath.installerId().toString());
- flowObj.setFlowPathFlags(flowPath.flowPathFlags().flags());
- flowObj.setSrcSwitch(flowPath.dataPath().srcPort().dpid().toString());
- flowObj.setSrcPort(flowPath.dataPath().srcPort().port().value());
- flowObj.setDstSwitch(flowPath.dataPath().dstPort().dpid().toString());
- flowObj.setDstPort(flowPath.dataPath().dstPort().port().value());
- if (flowPath.flowEntryMatch().matchSrcMac()) {
- flowObj.setMatchSrcMac(flowPath.flowEntryMatch().srcMac().toString());
+ if (FlowDatabaseOperation.addFlow(this, dbHandlerApi, flowPath, flowId)) {
+ datagridService.notificationSendFlowAdded(flowPath);
+ return true;
}
- if (flowPath.flowEntryMatch().matchDstMac()) {
- flowObj.setMatchDstMac(flowPath.flowEntryMatch().dstMac().toString());
- }
- if (flowPath.flowEntryMatch().matchEthernetFrameType()) {
- flowObj.setMatchEthernetFrameType(flowPath.flowEntryMatch().ethernetFrameType());
- }
- if (flowPath.flowEntryMatch().matchVlanId()) {
- flowObj.setMatchVlanId(flowPath.flowEntryMatch().vlanId());
- }
- if (flowPath.flowEntryMatch().matchVlanPriority()) {
- flowObj.setMatchVlanPriority(flowPath.flowEntryMatch().vlanPriority());
- }
- if (flowPath.flowEntryMatch().matchSrcIPv4Net()) {
- flowObj.setMatchSrcIPv4Net(flowPath.flowEntryMatch().srcIPv4Net().toString());
- }
- if (flowPath.flowEntryMatch().matchDstIPv4Net()) {
- flowObj.setMatchDstIPv4Net(flowPath.flowEntryMatch().dstIPv4Net().toString());
- }
- if (flowPath.flowEntryMatch().matchIpProto()) {
- flowObj.setMatchIpProto(flowPath.flowEntryMatch().ipProto());
- }
- if (flowPath.flowEntryMatch().matchIpToS()) {
- flowObj.setMatchIpToS(flowPath.flowEntryMatch().ipToS());
- }
- if (flowPath.flowEntryMatch().matchSrcTcpUdpPort()) {
- flowObj.setMatchSrcTcpUdpPort(flowPath.flowEntryMatch().srcTcpUdpPort());
- }
- if (flowPath.flowEntryMatch().matchDstTcpUdpPort()) {
- flowObj.setMatchDstTcpUdpPort(flowPath.flowEntryMatch().dstTcpUdpPort());
- }
- if (! flowPath.flowEntryActions().actions().isEmpty()) {
- flowObj.setActions(flowPath.flowEntryActions().toString());
- }
-
- if (dataPathSummaryStr != null) {
- flowObj.setDataPathSummary(dataPathSummaryStr);
- } else {
- flowObj.setDataPathSummary("");
- }
-
- if (found)
- flowObj.setUserState("FE_USER_MODIFY");
- else
- flowObj.setUserState("FE_USER_ADD");
-
- // Flow edges:
- // HeadFE
-
-
- //
- // Flow Entries:
- // flowPath.dataPath().flowEntries()
- //
- for (FlowEntry flowEntry : flowPath.dataPath().flowEntries()) {
- if (addFlowEntry(flowObj, flowEntry) == null) {
- op.rollback();
- return false;
- }
- }
- op.commit();
-
- //
- // TODO: We need a proper Flow ID allocation mechanism.
- //
- flowId.setValue(flowPath.flowId().value());
-
- return true;
+ return false;
}
/**
@@ -726,150 +264,20 @@
* @return the added Flow Entry object on success, otherwise null.
*/
private IFlowEntry addFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
- // Flow edges
- // HeadFE (TODO)
+ return FlowDatabaseOperation.addFlowEntry(this, dbHandlerInner,
+ flowObj, flowEntry);
+ }
- //
- // Assign the FlowEntry ID.
- //
- if ((flowEntry.flowEntryId() == null) ||
- (flowEntry.flowEntryId().value() == 0)) {
- long id = getNextFlowEntryId();
- flowEntry.setFlowEntryId(new FlowEntryId(id));
- }
-
- IFlowEntry flowEntryObj = null;
- boolean found = false;
- try {
- if ((flowEntryObj =
- op.searchFlowEntry(flowEntry.flowEntryId())) != null) {
- log.debug("Adding FlowEntry with FlowEntryId {}: found existing FlowEntry",
- flowEntry.flowEntryId().toString());
- found = true;
- } else {
- flowEntryObj = op.newFlowEntry();
- log.debug("Adding FlowEntry with FlowEntryId {}: creating new FlowEntry",
- flowEntry.flowEntryId().toString());
- }
- } catch (Exception e) {
- log.error(":addFlow FlowEntryId:{} failed",
- flowEntry.flowEntryId().toString());
- return null;
- }
- if (flowEntryObj == null) {
- log.error(":addFlow FlowEntryId:{} failed: FlowEntry object not created",
- flowEntry.flowEntryId().toString());
- return null;
- }
-
- //
- // Set the Flow Entry key:
- // - flowEntry.flowEntryId()
- //
- flowEntryObj.setFlowEntryId(flowEntry.flowEntryId().toString());
- flowEntryObj.setType("flow_entry");
-
- //
- // Set the Flow Entry Edges and attributes:
- // - Switch edge
- // - InPort edge
- // - OutPort edge
- //
- // - flowEntry.dpid()
- // - flowEntry.flowEntryUserState()
- // - flowEntry.flowEntrySwitchState()
- // - flowEntry.flowEntryErrorState()
- // - flowEntry.matchInPort()
- // - flowEntry.matchSrcMac()
- // - flowEntry.matchDstMac()
- // - flowEntry.matchEthernetFrameType()
- // - flowEntry.matchVlanId()
- // - flowEntry.matchVlanPriority()
- // - flowEntry.matchSrcIPv4Net()
- // - flowEntry.matchDstIPv4Net()
- // - flowEntry.matchIpProto()
- // - flowEntry.matchIpToS()
- // - flowEntry.matchSrcTcpUdpPort()
- // - flowEntry.matchDstTcpUdpPort()
- // - flowEntry.actionOutputPort()
- // - flowEntry.actions()
- //
- ISwitchObject sw = op.searchSwitch(flowEntry.dpid().toString());
- flowEntryObj.setSwitchDpid(flowEntry.dpid().toString());
- flowEntryObj.setSwitch(sw);
- if (flowEntry.flowEntryMatch().matchInPort()) {
- IPortObject inport =
- op.searchPort(flowEntry.dpid().toString(),
- flowEntry.flowEntryMatch().inPort().value());
- flowEntryObj.setMatchInPort(flowEntry.flowEntryMatch().inPort().value());
- flowEntryObj.setInPort(inport);
- }
- if (flowEntry.flowEntryMatch().matchSrcMac()) {
- flowEntryObj.setMatchSrcMac(flowEntry.flowEntryMatch().srcMac().toString());
- }
- if (flowEntry.flowEntryMatch().matchDstMac()) {
- flowEntryObj.setMatchDstMac(flowEntry.flowEntryMatch().dstMac().toString());
- }
- if (flowEntry.flowEntryMatch().matchEthernetFrameType()) {
- flowEntryObj.setMatchEthernetFrameType(flowEntry.flowEntryMatch().ethernetFrameType());
- }
- if (flowEntry.flowEntryMatch().matchVlanId()) {
- flowEntryObj.setMatchVlanId(flowEntry.flowEntryMatch().vlanId());
- }
- if (flowEntry.flowEntryMatch().matchVlanPriority()) {
- flowEntryObj.setMatchVlanPriority(flowEntry.flowEntryMatch().vlanPriority());
- }
- if (flowEntry.flowEntryMatch().matchSrcIPv4Net()) {
- flowEntryObj.setMatchSrcIPv4Net(flowEntry.flowEntryMatch().srcIPv4Net().toString());
- }
- if (flowEntry.flowEntryMatch().matchDstIPv4Net()) {
- flowEntryObj.setMatchDstIPv4Net(flowEntry.flowEntryMatch().dstIPv4Net().toString());
- }
- if (flowEntry.flowEntryMatch().matchIpProto()) {
- flowEntryObj.setMatchIpProto(flowEntry.flowEntryMatch().ipProto());
- }
- if (flowEntry.flowEntryMatch().matchIpToS()) {
- flowEntryObj.setMatchIpToS(flowEntry.flowEntryMatch().ipToS());
- }
- if (flowEntry.flowEntryMatch().matchSrcTcpUdpPort()) {
- flowEntryObj.setMatchSrcTcpUdpPort(flowEntry.flowEntryMatch().srcTcpUdpPort());
- }
- if (flowEntry.flowEntryMatch().matchDstTcpUdpPort()) {
- flowEntryObj.setMatchDstTcpUdpPort(flowEntry.flowEntryMatch().dstTcpUdpPort());
- }
-
- for (FlowEntryAction fa : flowEntry.flowEntryActions().actions()) {
- if (fa.actionOutput() != null) {
- IPortObject outport =
- op.searchPort(flowEntry.dpid().toString(),
- fa.actionOutput().port().value());
- flowEntryObj.setActionOutputPort(fa.actionOutput().port().value());
- flowEntryObj.setOutPort(outport);
- }
- }
- if (! flowEntry.flowEntryActions().isEmpty()) {
- flowEntryObj.setActions(flowEntry.flowEntryActions().toString());
- }
-
- // TODO: Hacks with hard-coded state names!
- if (found)
- flowEntryObj.setUserState("FE_USER_MODIFY");
- else
- flowEntryObj.setUserState("FE_USER_ADD");
- flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- //
- // TODO: Take care of the FlowEntryErrorState.
- //
-
- // Flow Entries edges:
- // Flow
- // NextFE (TODO)
- if (! found) {
- flowObj.addFlowEntry(flowEntryObj);
- flowEntryObj.setFlow(flowObj);
- }
-
- return flowEntryObj;
+ /**
+ * Delete a flow entry from the Network MAP.
+ *
+ * @param flowObj the corresponding Flow Path object for the Flow Entry.
+ * @param flowEntry the Flow Entry to delete.
+ * @return true on success, otherwise false.
+ */
+ private boolean deleteFlowEntry(IFlowPath flowObj, FlowEntry flowEntry) {
+ return FlowDatabaseOperation.deleteFlowEntry(dbHandlerInner,
+ flowObj, flowEntry);
}
/**
@@ -879,64 +287,11 @@
*/
@Override
public boolean deleteAllFlows() {
- List<Thread> threads = new LinkedList<Thread>();
- final ConcurrentLinkedQueue<FlowId> concurrentAllFlowIds =
- new ConcurrentLinkedQueue<FlowId>();
-
- // Get all Flow IDs
- Iterable<IFlowPath> allFlowPaths = op.getAllFlowPaths();
- for (IFlowPath flowPathObj : allFlowPaths) {
- if (flowPathObj == null)
- continue;
- String flowIdStr = flowPathObj.getFlowId();
- if (flowIdStr == null)
- continue;
- FlowId flowId = new FlowId(flowIdStr);
- concurrentAllFlowIds.add(flowId);
+ if (FlowDatabaseOperation.deleteAllFlows(dbHandlerApi)) {
+ datagridService.notificationSendAllFlowsRemoved();
+ return true;
}
-
- // Delete all flows one-by-one
- for (FlowId flowId : concurrentAllFlowIds)
- deleteFlow(flowId);
-
- /*
- * TODO: A faster mechanism to delete the Flow Paths by using
- * a number of threads. Commented-out for now.
- */
- /*
- //
- // Create the threads to delete the Flow Paths
- //
- for (int i = 0; i < 10; i++) {
- Thread thread = new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- FlowId flowId = concurrentAllFlowIds.poll();
- if (flowId == null)
- return;
- deleteFlow(flowId);
- }
- }}, "Delete All Flow Paths");
- threads.add(thread);
- }
-
- // Start processing
- for (Thread thread : threads) {
- thread.start();
- }
-
- // Wait for all threads to complete
- for (Thread thread : threads) {
- try {
- thread.join();
- } catch (InterruptedException e) {
- log.debug("Exception waiting for a thread to delete a Flow Path: ", e);
- }
- }
- */
-
- return true;
+ return false;
}
/**
@@ -947,130 +302,11 @@
*/
@Override
public boolean deleteFlow(FlowId flowId) {
- /*
- * TODO: Commented-out for now
- if (flowId.value() == measurementFlowId) {
- modifiedMeasurementFlowTime = System.nanoTime();
+ if (FlowDatabaseOperation.deleteFlow(dbHandlerApi, flowId)) {
+ datagridService.notificationSendFlowRemoved(flowId);
+ return true;
}
- */
-
- IFlowPath flowObj = null;
- //
- // We just mark the entries for deletion,
- // and let the switches remove each individual entry after
- // it has been removed from the switches.
- //
- try {
- if ((flowObj = op.searchFlowPath(flowId))
- != null) {
- log.debug("Deleting FlowPath with FlowId {}: found existing FlowPath",
- flowId.toString());
- } else {
- log.debug("Deleting FlowPath with FlowId {}: FlowPath not found",
- flowId.toString());
- }
- } catch (Exception e) {
- // TODO: handle exceptions
- op.rollback();
- log.error(":deleteFlow FlowId:{} failed", flowId.toString());
- }
- if (flowObj == null) {
- op.commit();
- return true; // OK: No such flow
- }
-
- //
- // Find and mark for deletion all Flow Entries,
- // and the Flow itself.
- //
- flowObj.setUserState("FE_USER_DELETE");
- Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
- boolean empty = true; // TODO: an ugly hack
- for (IFlowEntry flowEntryObj : flowEntries) {
- empty = false;
- // flowObj.removeFlowEntry(flowEntryObj);
- // conn.utils().removeFlowEntry(conn, flowEntryObj);
- flowEntryObj.setUserState("FE_USER_DELETE");
- flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- }
- // Remove from the database empty flows
- if (empty)
- op.removeFlowPath(flowObj);
- op.commit();
-
- return true;
- }
-
- /**
- * Clear the state for all previously added flows.
- *
- * @return true on success, otherwise false.
- */
- @Override
- public boolean clearAllFlows() {
- List<FlowId> allFlowIds = new LinkedList<FlowId>();
-
- // Get all Flow IDs
- Iterable<IFlowPath> allFlowPaths = op.getAllFlowPaths();
- for (IFlowPath flowPathObj : allFlowPaths) {
- if (flowPathObj == null)
- continue;
- String flowIdStr = flowPathObj.getFlowId();
- if (flowIdStr == null)
- continue;
- FlowId flowId = new FlowId(flowIdStr);
- allFlowIds.add(flowId);
- }
-
- // Clear all flows one-by-one
- for (FlowId flowId : allFlowIds) {
- clearFlow(flowId);
- }
-
- return true;
- }
-
- /**
- * Clear the state for a previously added flow.
- *
- * @param flowId the Flow ID of the flow to clear.
- * @return true on success, otherwise false.
- */
- @Override
- public boolean clearFlow(FlowId flowId) {
- IFlowPath flowObj = null;
- try {
- if ((flowObj = op.searchFlowPath(flowId))
- != null) {
- log.debug("Clearing FlowPath with FlowId {}: found existing FlowPath",
- flowId.toString());
- } else {
- log.debug("Clearing FlowPath with FlowId {}: FlowPath not found",
- flowId.toString());
- }
- } catch (Exception e) {
- // TODO: handle exceptions
- op.rollback();
- log.error(":clearFlow FlowId:{} failed", flowId.toString());
- }
- if (flowObj == null) {
- op.commit();
- return true; // OK: No such flow
- }
-
- //
- // Remove all Flow Entries
- //
- Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
- for (IFlowEntry flowEntryObj : flowEntries) {
- flowObj.removeFlowEntry(flowEntryObj);
- op.removeFlowEntry(flowEntryObj);
- }
- // Remove the Flow itself
- op.removeFlowPath(flowObj);
- op.commit();
-
- return true;
+ return false;
}
/**
@@ -1081,33 +317,17 @@
*/
@Override
public FlowPath getFlow(FlowId flowId) {
- IFlowPath flowObj = null;
- try {
- if ((flowObj = op.searchFlowPath(flowId))
- != null) {
- log.debug("Get FlowPath with FlowId {}: found existing FlowPath",
- flowId.toString());
- } else {
- log.debug("Get FlowPath with FlowId {}: FlowPath not found",
- flowId.toString());
- }
- } catch (Exception e) {
- // TODO: handle exceptions
- op.rollback();
- log.error(":getFlow FlowId:{} failed", flowId.toString());
- }
- if (flowObj == null) {
- op.commit();
- return null; // Flow not found
- }
+ return FlowDatabaseOperation.getFlow(dbHandlerApi, flowId);
+ }
- //
- // Extract the Flow state
- //
- FlowPath flowPath = extractFlowPath(flowObj);
- op.commit();
-
- return flowPath;
+ /**
+ * Get all installed flows by all installers.
+ *
+ * @return the Flow Paths if found, otherwise null.
+ */
+ @Override
+ public ArrayList<FlowPath> getAllFlows() {
+ return FlowDatabaseOperation.getAllFlows(dbHandlerApi);
}
/**
@@ -1121,45 +341,8 @@
@Override
public ArrayList<FlowPath> getAllFlows(CallerId installerId,
DataPathEndpoints dataPathEndpoints) {
- //
- // TODO: The implementation below is not optimal:
- // We fetch all flows, and then return only the subset that match
- // the query conditions.
- // We should use the appropriate Titan/Gremlin query to filter-out
- // the flows as appropriate.
- //
- ArrayList<FlowPath> allFlows = getAllFlows();
- ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
-
- if (allFlows == null) {
- log.debug("Get FlowPaths for installerId{} and dataPathEndpoints{}: no FlowPaths found", installerId, dataPathEndpoints);
- return flowPaths;
- }
-
- for (FlowPath flow : allFlows) {
- //
- // TODO: String-based comparison is sub-optimal.
- // We are using it for now to save us the extra work of
- // implementing the "equals()" and "hashCode()" methods.
- //
- if (! flow.installerId().toString().equals(installerId.toString()))
- continue;
- if (! flow.dataPath().srcPort().toString().equals(dataPathEndpoints.srcPort().toString())) {
- continue;
- }
- if (! flow.dataPath().dstPort().toString().equals(dataPathEndpoints.dstPort().toString())) {
- continue;
- }
- flowPaths.add(flow);
- }
-
- if (flowPaths.isEmpty()) {
- log.debug("Get FlowPaths for installerId{} and dataPathEndpoints{}: no FlowPaths found", installerId, dataPathEndpoints);
- } else {
- log.debug("Get FlowPaths for installerId{} and dataPathEndpoints{}: FlowPaths are found", installerId, dataPathEndpoints);
- }
-
- return flowPaths;
+ return FlowDatabaseOperation.getAllFlows(dbHandlerApi, installerId,
+ dataPathEndpoints);
}
/**
@@ -1170,43 +353,8 @@
*/
@Override
public ArrayList<FlowPath> getAllFlows(DataPathEndpoints dataPathEndpoints) {
- //
- // TODO: The implementation below is not optimal:
- // We fetch all flows, and then return only the subset that match
- // the query conditions.
- // We should use the appropriate Titan/Gremlin query to filter-out
- // the flows as appropriate.
- //
- ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
- ArrayList<FlowPath> allFlows = getAllFlows();
-
- if (allFlows == null) {
- log.debug("Get FlowPaths for dataPathEndpoints{}: no FlowPaths found", dataPathEndpoints);
- return flowPaths;
- }
-
- for (FlowPath flow : allFlows) {
- //
- // TODO: String-based comparison is sub-optimal.
- // We are using it for now to save us the extra work of
- // implementing the "equals()" and "hashCode()" methods.
- //
- if (! flow.dataPath().srcPort().toString().equals(dataPathEndpoints.srcPort().toString())) {
- continue;
- }
- if (! flow.dataPath().dstPort().toString().equals(dataPathEndpoints.dstPort().toString())) {
- continue;
- }
- flowPaths.add(flow);
- }
-
- if (flowPaths.isEmpty()) {
- log.debug("Get FlowPaths for dataPathEndpoints{}: no FlowPaths found", dataPathEndpoints);
- } else {
- log.debug("Get FlowPaths for dataPathEndpoints{}: FlowPaths are found", dataPathEndpoints);
- }
-
- return flowPaths;
+ return FlowDatabaseOperation.getAllFlows(dbHandlerApi,
+ dataPathEndpoints);
}
/**
@@ -1217,344 +365,19 @@
* @return the Flow Paths if found, otherwise null.
*/
@Override
- public ArrayList<IFlowPath> getAllFlowsSummary(FlowId flowId, int maxFlows) {
-
- //
- // TODO: The implementation below is not optimal:
- // We fetch all flows, and then return only the subset that match
- // the query conditions.
- // We should use the appropriate Titan/Gremlin query to filter-out
- // the flows as appropriate.
- //
- //ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
-
- ArrayList<IFlowPath> flowPathsWithoutFlowEntries = getAllFlowsWithoutFlowEntries();
-
- Collections.sort(flowPathsWithoutFlowEntries,
- new Comparator<IFlowPath>(){
- @Override
- public int compare(IFlowPath first, IFlowPath second) {
- // TODO Auto-generated method stub
- long result = new FlowId(first.getFlowId()).value()
- - new FlowId(second.getFlowId()).value();
- if (result > 0) return 1;
- else if (result < 0) return -1;
- else return 0;
- }
- }
- );
-
- return flowPathsWithoutFlowEntries;
-
- /*
- ArrayList<FlowPath> allFlows = getAllFlows();
-
- if (allFlows == null) {
- log.debug("Get FlowPathsSummary for {} {}: no FlowPaths found", flowId, maxFlows);
- return flowPaths;
- }
-
- Collections.sort(allFlows);
-
- for (FlowPath flow : allFlows) {
- flow.setFlowEntryMatch(null);
-
- // start from desired flowId
- if (flow.flowId().value() < flowId.value()) {
- continue;
- }
-
- // Summarize by making null flow entry fields that are not relevant to report
- for (FlowEntry flowEntry : flow.dataPath().flowEntries()) {
- flowEntry.setFlowEntryActions(null);
- flowEntry.setFlowEntryMatch(null);
- }
-
- flowPaths.add(flow);
- if (maxFlows != 0 && flowPaths.size() >= maxFlows) {
- break;
- }
- }
-
- if (flowPaths.isEmpty()) {
- log.debug("Get FlowPathsSummary {} {}: no FlowPaths found", flowId, maxFlows);
- } else {
- log.debug("Get FlowPathsSummary for {} {}: FlowPaths were found", flowId, maxFlows);
- }
-
- return flowPaths;
- */
+ public ArrayList<IFlowPath> getAllFlowsSummary(FlowId flowId,
+ int maxFlows) {
+ return FlowDatabaseOperation.getAllFlowsSummary(dbHandlerApi, flowId,
+ maxFlows);
}
/**
- * Get all installed flows by all installers.
- *
- * @return the Flow Paths if found, otherwise null.
- */
- @Override
- public ArrayList<FlowPath> getAllFlows() {
- Iterable<IFlowPath> flowPathsObj = null;
- ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
-
- try {
- if ((flowPathsObj = op.getAllFlowPaths()) != null) {
- log.debug("Get all FlowPaths: found FlowPaths");
- } else {
- log.debug("Get all FlowPaths: no FlowPaths found");
- }
- } catch (Exception e) {
- // TODO: handle exceptions
- op.rollback();
- log.error(":getAllFlowPaths failed");
- }
- if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
- op.commit();
- return flowPaths; // No Flows found
- }
-
- for (IFlowPath flowObj : flowPathsObj) {
- //
- // Extract the Flow state
- //
- FlowPath flowPath = extractFlowPath(flowObj);
- if (flowPath != null)
- flowPaths.add(flowPath);
- }
-
- op.commit();
-
- return flowPaths;
- }
-
- /**
* Get all Flows information, without the associated Flow Entries.
*
* @return all Flows information, without the associated Flow Entries.
*/
public ArrayList<IFlowPath> getAllFlowsWithoutFlowEntries() {
- Iterable<IFlowPath> flowPathsObj = null;
- ArrayList<IFlowPath> flowPathsObjArray = new ArrayList<IFlowPath>();
- ArrayList<FlowPath> flowPaths = new ArrayList<FlowPath>();
-
- op.commit();
-
- try {
- if ((flowPathsObj = op.getAllFlowPaths()) != null) {
- log.debug("Get all FlowPaths: found FlowPaths");
- } else {
- log.debug("Get all FlowPaths: no FlowPaths found");
- }
- } catch (Exception e) {
- // TODO: handle exceptions
- op.rollback();
- log.error(":getAllFlowPaths failed");
- }
- if ((flowPathsObj == null) || (flowPathsObj.iterator().hasNext() == false)) {
- return new ArrayList<IFlowPath>(); // No Flows found
- }
-
- for (IFlowPath flowObj : flowPathsObj){
- flowPathsObjArray.add(flowObj);
- }
- /*
- for (IFlowPath flowObj : flowPathsObj) {
- //
- // Extract the Flow state
- //
- FlowPath flowPath = extractFlowPath(flowObj);
- if (flowPath != null)
- flowPaths.add(flowPath);
- }
- */
-
- //conn.endTx(Transaction.COMMIT);
-
- return flowPathsObjArray;
- }
-
- /**
- * Extract Flow Path State from a Titan Database Object @ref IFlowPath.
- *
- * @param flowObj the object to extract the Flow Path State from.
- * @return the extracted Flow Path State.
- */
- private FlowPath extractFlowPath(IFlowPath flowObj) {
- //
- // Extract the Flow state
- //
- String flowIdStr = flowObj.getFlowId();
- String installerIdStr = flowObj.getInstallerId();
- Long flowPathFlags = flowObj.getFlowPathFlags();
- String srcSwitchStr = flowObj.getSrcSwitch();
- Short srcPortShort = flowObj.getSrcPort();
- String dstSwitchStr = flowObj.getDstSwitch();
- Short dstPortShort = flowObj.getDstPort();
-
- if ((flowIdStr == null) ||
- (installerIdStr == null) ||
- (flowPathFlags == null) ||
- (srcSwitchStr == null) ||
- (srcPortShort == null) ||
- (dstSwitchStr == null) ||
- (dstPortShort == null)) {
- // TODO: A work-around, becauuse of some bogus database objects
- return null;
- }
-
- FlowPath flowPath = new FlowPath();
- flowPath.setFlowId(new FlowId(flowIdStr));
- flowPath.setInstallerId(new CallerId(installerIdStr));
- flowPath.setFlowPathFlags(new FlowPathFlags(flowPathFlags));
- flowPath.dataPath().srcPort().setDpid(new Dpid(srcSwitchStr));
- flowPath.dataPath().srcPort().setPort(new Port(srcPortShort));
- flowPath.dataPath().dstPort().setDpid(new Dpid(dstSwitchStr));
- flowPath.dataPath().dstPort().setPort(new Port(dstPortShort));
- //
- // Extract the match conditions common for all Flow Entries
- //
- {
- FlowEntryMatch match = new FlowEntryMatch();
- String matchSrcMac = flowObj.getMatchSrcMac();
- if (matchSrcMac != null)
- match.enableSrcMac(MACAddress.valueOf(matchSrcMac));
- String matchDstMac = flowObj.getMatchDstMac();
- if (matchDstMac != null)
- match.enableDstMac(MACAddress.valueOf(matchDstMac));
- Short matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType != null)
- match.enableEthernetFrameType(matchEthernetFrameType);
- Short matchVlanId = flowObj.getMatchVlanId();
- if (matchVlanId != null)
- match.enableVlanId(matchVlanId);
- Byte matchVlanPriority = flowObj.getMatchVlanPriority();
- if (matchVlanPriority != null)
- match.enableVlanPriority(matchVlanPriority);
- String matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net != null)
- match.enableSrcIPv4Net(new IPv4Net(matchSrcIPv4Net));
- String matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net != null)
- match.enableDstIPv4Net(new IPv4Net(matchDstIPv4Net));
- Byte matchIpProto = flowObj.getMatchIpProto();
- if (matchIpProto != null)
- match.enableIpProto(matchIpProto);
- Byte matchIpToS = flowObj.getMatchIpToS();
- if (matchIpToS != null)
- match.enableIpToS(matchIpToS);
- Short matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort != null)
- match.enableSrcTcpUdpPort(matchSrcTcpUdpPort);
- Short matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort != null)
- match.enableDstTcpUdpPort(matchDstTcpUdpPort);
-
- flowPath.setFlowEntryMatch(match);
- }
- //
- // Extract the actions for the first Flow Entry
- //
- {
- String actionsStr = flowObj.getActions();
- if (actionsStr != null) {
- FlowEntryActions flowEntryActions = new FlowEntryActions(actionsStr);
- flowPath.setFlowEntryActions(flowEntryActions);
- }
- }
-
- //
- // Extract all Flow Entries
- //
- Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
- for (IFlowEntry flowEntryObj : flowEntries) {
- FlowEntry flowEntry = extractFlowEntry(flowEntryObj);
- if (flowEntry == null)
- continue;
- flowPath.dataPath().flowEntries().add(flowEntry);
- }
-
- return flowPath;
- }
-
- /**
- * Extract Flow Entry State from a Titan Database Object @ref IFlowEntry.
- *
- * @param flowEntryObj the object to extract the Flow Entry State from.
- * @return the extracted Flow Entry State.
- */
- private FlowEntry extractFlowEntry(IFlowEntry flowEntryObj) {
- String flowEntryIdStr = flowEntryObj.getFlowEntryId();
- String switchDpidStr = flowEntryObj.getSwitchDpid();
- String userState = flowEntryObj.getUserState();
- String switchState = flowEntryObj.getSwitchState();
-
- if ((flowEntryIdStr == null) ||
- (switchDpidStr == null) ||
- (userState == null) ||
- (switchState == null)) {
- // TODO: A work-around, becauuse of some bogus database objects
- return null;
- }
-
- FlowEntry flowEntry = new FlowEntry();
- flowEntry.setFlowEntryId(new FlowEntryId(flowEntryIdStr));
- flowEntry.setDpid(new Dpid(switchDpidStr));
-
- //
- // Extract the match conditions
- //
- FlowEntryMatch match = new FlowEntryMatch();
- Short matchInPort = flowEntryObj.getMatchInPort();
- if (matchInPort != null)
- match.enableInPort(new Port(matchInPort));
- String matchSrcMac = flowEntryObj.getMatchSrcMac();
- if (matchSrcMac != null)
- match.enableSrcMac(MACAddress.valueOf(matchSrcMac));
- String matchDstMac = flowEntryObj.getMatchDstMac();
- if (matchDstMac != null)
- match.enableDstMac(MACAddress.valueOf(matchDstMac));
- Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType != null)
- match.enableEthernetFrameType(matchEthernetFrameType);
- Short matchVlanId = flowEntryObj.getMatchVlanId();
- if (matchVlanId != null)
- match.enableVlanId(matchVlanId);
- Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
- if (matchVlanPriority != null)
- match.enableVlanPriority(matchVlanPriority);
- String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net != null)
- match.enableSrcIPv4Net(new IPv4Net(matchSrcIPv4Net));
- String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net != null)
- match.enableDstIPv4Net(new IPv4Net(matchDstIPv4Net));
- Byte matchIpProto = flowEntryObj.getMatchIpProto();
- if (matchIpProto != null)
- match.enableIpProto(matchIpProto);
- Byte matchIpToS = flowEntryObj.getMatchIpToS();
- if (matchIpToS != null)
- match.enableIpToS(matchIpToS);
- Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort != null)
- match.enableSrcTcpUdpPort(matchSrcTcpUdpPort);
- Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort != null)
- match.enableDstTcpUdpPort(matchDstTcpUdpPort);
- flowEntry.setFlowEntryMatch(match);
-
- //
- // Extract the actions
- //
- FlowEntryActions actions = new FlowEntryActions();
- String actionsStr = flowEntryObj.getActions();
- if (actionsStr != null)
- actions = new FlowEntryActions(actionsStr);
- flowEntry.setFlowEntryActions(actions);
- flowEntry.setFlowEntryUserState(FlowEntryUserState.valueOf(userState));
- flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.valueOf(switchState));
- //
- // TODO: Take care of FlowEntryErrorState.
- //
- return flowEntry;
+ return FlowDatabaseOperation.getAllFlowsWithoutFlowEntries(dbHandlerApi);
}
/**
@@ -1573,108 +396,29 @@
// Instead, let the Flow reconciliation thread take care of it.
//
- // We need the DataPath to populate the Network MAP
- DataPath dataPath = new DataPath();
- dataPath.setSrcPort(flowPath.dataPath().srcPort());
- dataPath.setDstPort(flowPath.dataPath().dstPort());
-
- //
- // Prepare the computed Flow Path
- //
- FlowPath computedFlowPath = new FlowPath();
- computedFlowPath.setFlowId(new FlowId(flowPath.flowId().value()));
- computedFlowPath.setInstallerId(new CallerId(flowPath.installerId().value()));
- computedFlowPath.setFlowPathFlags(new FlowPathFlags(flowPath.flowPathFlags().flags()));
- computedFlowPath.setDataPath(dataPath);
- computedFlowPath.setFlowEntryMatch(new FlowEntryMatch(flowPath.flowEntryMatch()));
- computedFlowPath.setFlowEntryActions(new FlowEntryActions(flowPath.flowEntryActions()));
-
FlowId flowId = new FlowId();
- String dataPathSummaryStr = dataPath.dataPathSummary();
- if (! addFlow(computedFlowPath, flowId, dataPathSummaryStr))
+ if (! addFlow(flowPath, flowId))
return null;
- // TODO: Mark the flow for maintenance purpose
-
- return (computedFlowPath);
+ return (flowPath);
}
/**
- * Reconcile a flow.
+ * Get the collection of my switches.
*
- * @param flowObj the flow that needs to be reconciliated.
- * @param newDataPath the new data path to use.
- * @return true on success, otherwise false.
+ * @return the collection of my switches.
*/
- public boolean reconcileFlow(IFlowPath flowObj, DataPath newDataPath) {
- Map<Long, IOFSwitch> mySwitches = floodlightProvider.getSwitches();
-
- //
- // Set the incoming port matching and the outgoing port output
- // actions for each flow entry.
- //
- int idx = 0;
- for (FlowEntry flowEntry : newDataPath.flowEntries()) {
- // Set the incoming port matching
- FlowEntryMatch flowEntryMatch = new FlowEntryMatch();
- flowEntry.setFlowEntryMatch(flowEntryMatch);
- flowEntryMatch.enableInPort(flowEntry.inPort());
-
- //
- // Set the actions
- //
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- // If the first Flow Entry, copy the Flow Path actions to it
- //
- if (idx == 0) {
- String actionsStr = flowObj.getActions();
- if (actionsStr != null) {
- FlowEntryActions flowActions = new FlowEntryActions(actionsStr);
- for (FlowEntryAction action : flowActions.actions())
- flowEntryActions.addAction(action);
- }
- }
- idx++;
- //
- // Add the outgoing port output action
- //
- FlowEntryAction flowEntryAction = new FlowEntryAction();
- flowEntryAction.setActionOutput(flowEntry.outPort());
- flowEntryActions.addAction(flowEntryAction);
- }
-
- //
- // Remove the old Flow Entries, and add the new Flow Entries
- //
- Iterable<IFlowEntry> flowEntries = flowObj.getFlowEntries();
- LinkedList<IFlowEntry> deleteFlowEntries = new LinkedList<IFlowEntry>();
- for (IFlowEntry flowEntryObj : flowEntries) {
- flowEntryObj.setUserState("FE_USER_DELETE");
- flowEntryObj.setSwitchState("FE_SWITCH_NOT_UPDATED");
- }
- for (FlowEntry flowEntry : newDataPath.flowEntries()) {
- addFlowEntry(flowObj, flowEntry);
- }
-
- //
- // Set the Data Path Summary
- //
- String dataPathSummaryStr = newDataPath.dataPathSummary();
- flowObj.setDataPathSummary(dataPathSummaryStr);
-
- return true;
+ public Map<Long, IOFSwitch> getMySwitches() {
+ return floodlightProvider.getSwitches();
}
/**
- * Reconcile all flows in a set.
+ * Get the network topology.
*
- * @param flowObjSet the set of flows that need to be reconciliated.
+ * @return the network topology.
*/
- public void reconcileFlows(Iterable<IFlowPath> flowObjSet) {
- if (! flowObjSet.iterator().hasNext())
- return;
- // TODO: Not implemented/used yet.
+ public Topology getTopology() {
+ return flowEventHandler.getTopology();
}
/**
@@ -1685,313 +429,15 @@
* @param flowEntryObj the flow entry object to install.
* @return true on success, otherwise false.
*/
- public boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
+ private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
IFlowEntry flowEntryObj) {
- String flowEntryIdStr = flowEntryObj.getFlowEntryId();
- if (flowEntryIdStr == null)
- return false;
- FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
- String userState = flowEntryObj.getUserState();
- if (userState == null)
- return false;
-
- //
- // Create the Open Flow Flow Modification Entry to push
- //
- OFFlowMod fm = (OFFlowMod) floodlightProvider.getOFMessageFactory()
- .getMessage(OFType.FLOW_MOD);
- long cookie = flowEntryId.value();
-
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (userState.equals("FE_USER_ADD")) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (userState.equals("FE_USER_MODIFY")) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (userState.equals("FE_USER_DELETE")) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntryId.toString(), userState);
- return false;
- }
-
- //
- // Fetch the match conditions.
- //
- // NOTE: The Flow matching conditions common for all Flow Entries are
- // used ONLY if a Flow Entry does NOT have the corresponding matching
- // condition set.
- //
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
-
- // Match the Incoming Port
- Short matchInPort = flowEntryObj.getMatchInPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- String matchSrcMac = flowEntryObj.getMatchSrcMac();
- if (matchSrcMac == null)
- matchSrcMac = flowObj.getMatchSrcMac();
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- String matchDstMac = flowEntryObj.getMatchDstMac();
- if (matchDstMac == null)
- matchDstMac = flowObj.getMatchDstMac();
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType == null)
- matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryObj.getMatchVlanId();
- if (matchVlanId == null)
- matchVlanId = flowObj.getMatchVlanId();
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
- if (matchVlanPriority == null)
- matchVlanPriority = flowObj.getMatchVlanPriority();
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net == null)
- matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
- }
-
- // Natch the Destination IPv4 Network prefix
- String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net == null)
- matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryObj.getMatchIpProto();
- if (matchIpProto == null)
- matchIpProto = flowObj.getMatchIpProto();
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryObj.getMatchIpToS();
- if (matchIpToS == null)
- matchIpToS = flowObj.getMatchIpToS();
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort == null)
- matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort == null)
- matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- //
- // Fetch the actions
- //
- Short actionOutputPort = null;
- List<OFAction> openFlowActions = new ArrayList<OFAction>();
- int actionsLen = 0;
- FlowEntryActions flowEntryActions = null;
- String actionsStr = flowEntryObj.getActions();
- if (actionsStr != null)
- flowEntryActions = new FlowEntryActions(actionsStr);
- for (FlowEntryAction action : flowEntryActions.actions()) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort = actionOutput.port().value();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa =
- new OFActionOutput(actionOutput.port().value(),
- (short)0xffff);
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan() == true) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().value(),
- actionEnqueue.queueId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
- .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
- .setPriority(PRIORITY_DEFAULT)
- .setBufferId(OFPacketOut.BUFFER_ID_NONE)
- .setCookie(cookie)
- .setCommand(flowModCommand)
- .setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
- (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort != null)
- fm.setOutPort(actionOutputPort);
- }
-
- //
- // TODO: Set the following flag
- // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- // See method ForwardingBase::pushRoute()
- //
-
- //
- // Write the message to the switch
- //
- log.debug("MEASUREMENT: Installing flow entry " + userState +
- " into switch DPID: " +
- mySwitch.getStringId() +
- " flowEntryId: " + flowEntryId.toString() +
- " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
- " inPort: " + matchInPort + " outPort: " + actionOutputPort
- );
- try {
- messageDamper.write(mySwitch, fm, null);
- mySwitch.flush();
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- //
- flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
- } catch (IOException e) {
- log.error("Failure writing flow mod from network map", e);
- return false;
- }
-
- return true;
+ if (enableFlowPusher) {
+ return pusher.add(mySwitch, flowObj, flowEntryObj);
+ } else {
+ return FlowSwitchOperation.installFlowEntry(
+ floodlightProvider.getOFMessageFactory(),
+ messageDamper, mySwitch, flowObj, flowEntryObj);
+ }
}
/**
@@ -2002,311 +448,15 @@
* @param flowEntry the flow entry to install.
* @return true on success, otherwise false.
*/
- public boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
+ private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
FlowEntry flowEntry) {
- //
- // Create the OpenFlow Flow Modification Entry to push
- //
- OFFlowMod fm = (OFFlowMod) floodlightProvider.getOFMessageFactory()
- .getMessage(OFType.FLOW_MOD);
- long cookie = flowEntry.flowEntryId().value();
-
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntry.flowEntryId().toString(),
- flowEntry.flowEntryUserState());
- return false;
- }
-
- //
- // Fetch the match conditions.
- //
- // NOTE: The Flow matching conditions common for all Flow Entries are
- // used ONLY if a Flow Entry does NOT have the corresponding matching
- // condition set.
- //
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
- FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
- FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
-
- // Match the Incoming Port
- Port matchInPort = flowEntryMatch.inPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort.value());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- MACAddress matchSrcMac = flowEntryMatch.srcMac();
- if ((matchSrcMac == null) && (flowPathMatch != null)) {
- matchSrcMac = flowPathMatch.srcMac();
- }
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- MACAddress matchDstMac = flowEntryMatch.dstMac();
- if ((matchDstMac == null) && (flowPathMatch != null)) {
- matchDstMac = flowPathMatch.dstMac();
- }
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
- if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
- matchEthernetFrameType = flowPathMatch.ethernetFrameType();
- }
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryMatch.vlanId();
- if ((matchVlanId == null) && (flowPathMatch != null)) {
- matchVlanId = flowPathMatch.vlanId();
- }
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryMatch.vlanPriority();
- if ((matchVlanPriority == null) && (flowPathMatch != null)) {
- matchVlanPriority = flowPathMatch.vlanPriority();
- }
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
- if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
- matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
- }
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
- }
-
- // Natch the Destination IPv4 Network prefix
- IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
- if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
- matchDstIPv4Net = flowPathMatch.dstIPv4Net();
- }
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryMatch.ipProto();
- if ((matchIpProto == null) && (flowPathMatch != null)) {
- matchIpProto = flowPathMatch.ipProto();
- }
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryMatch.ipToS();
- if ((matchIpToS == null) && (flowPathMatch != null)) {
- matchIpToS = flowPathMatch.ipToS();
- }
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
- if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
- matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
- }
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
- if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
- matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
- }
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- //
- // Fetch the actions
- //
- Short actionOutputPort = null;
- List<OFAction> openFlowActions = new ArrayList<OFAction>();
- int actionsLen = 0;
- FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- for (FlowEntryAction action : flowEntryActions.actions()) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort = actionOutput.port().value();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa =
- new OFActionOutput(actionOutput.port().value(),
- (short)0xffff);
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan() == true) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().value(),
- actionEnqueue.queueId());
- openFlowActions.add(ofa);
- actionsLen += ofa.getLength();
- }
- }
-
- fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
- .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
- .setPriority(PRIORITY_DEFAULT)
- .setBufferId(OFPacketOut.BUFFER_ID_NONE)
- .setCookie(cookie)
- .setCommand(flowModCommand)
- .setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
- (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort != null)
- fm.setOutPort(actionOutputPort);
- }
-
- //
- // TODO: Set the following flag
- // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- // See method ForwardingBase::pushRoute()
- //
-
- //
- // Write the message to the switch
- //
- try {
- messageDamper.write(mySwitch, fm, null);
- mySwitch.flush();
- //
- // TODO: We should use the OpenFlow Barrier mechanism
- // to check for errors, and update the SwitchState
- // for a flow entry after the Barrier message is
- // is received.
- //
- // TODO: The FlowEntry Object in Titan should be set
- // to FE_SWITCH_UPDATED.
- //
- } catch (IOException e) {
- log.error("Failure writing flow mod from network map", e);
- return false;
- }
- return true;
+ if (enableFlowPusher) {
+ return pusher.add(mySwitch, flowPath, flowEntry);
+ } else {
+ return FlowSwitchOperation.installFlowEntry(
+ floodlightProvider.getOFMessageFactory(),
+ messageDamper, mySwitch, flowPath, flowEntry);
+ }
}
/**
@@ -2317,7 +467,7 @@
* @param flowEntry the flow entry to remove.
* @return true on success, otherwise false.
*/
- public boolean removeFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
+ private boolean removeFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
FlowEntry flowEntry) {
//
// The installFlowEntry() method implements both installation
@@ -2327,37 +477,266 @@
}
/**
- * Install a Flow Entry on a remote controller.
+ * Push modified Flow Entries to switches.
*
- * TODO: We need it now: Jono
- * - For now it will make a REST call to the remote controller.
- * - Internally, it needs to know the name of the remote controller.
+ * NOTE: Only the Flow Entries to switches controlled by this instance
+ * are pushed.
*
- * @param flowPath the flow path for the flow entry to install.
- * @param flowEntry the flow entry to install.
- * @return true on success, otherwise false.
+ * @param modifiedFlowEntries the collection of modified Flow Entries.
*/
- public boolean installRemoteFlowEntry(FlowPath flowPath,
- FlowEntry flowEntry) {
- // TODO: We need it now: Jono
- // - For now it will make a REST call to the remote controller.
- // - Internally, it needs to know the name of the remote controller.
- return true;
+ public void pushModifiedFlowEntriesToSwitches(
+ Collection<FlowPathEntryPair> modifiedFlowEntries) {
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+ for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+ FlowPath flowPath = flowPair.flowPath;
+ FlowEntry flowEntry = flowPair.flowEntry;
+
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+ if (mySwitch == null)
+ continue;
+
+ log.debug("Pushing Flow Entry To Switch: {}", flowEntry.toString());
+
+ //
+ // Install the Flow Entry into the switch
+ //
+ if (! installFlowEntry(mySwitch, flowPath, flowEntry)) {
+ String logMsg = "Cannot install Flow Entry " +
+ flowEntry.flowEntryId() +
+ " from Flow Path " + flowPath.flowId() +
+ " on switch " + flowEntry.dpid();
+ log.error(logMsg);
+ continue;
+ }
+
+ //
+ // NOTE: Here we assume that the switch has been
+ // successfully updated.
+ //
+ flowEntry.setFlowEntrySwitchState(FlowEntrySwitchState.FE_SWITCH_UPDATED);
+ }
}
/**
- * Remove a flow entry on a remote controller.
+ * Push modified Flow Entries to the datagrid.
*
- * @param flowPath the flow path for the flow entry to remove.
- * @param flowEntry the flow entry to remove.
- * @return true on success, otherwise false.
+ * @param modifiedFlowEntries the collection of modified Flow Entries.
*/
- public boolean removeRemoteFlowEntry(FlowPath flowPath,
- FlowEntry flowEntry) {
+ public void pushModifiedFlowEntriesToDatagrid(
+ Collection<FlowPathEntryPair> modifiedFlowEntries) {
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+ for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+ FlowEntry flowEntry = flowPair.flowEntry;
+
+ IOFSwitch mySwitch = mySwitches.get(flowEntry.dpid().value());
+
+ //
+ // TODO: For now Flow Entries are removed by all instances,
+ // even if this Flow Entry is not for our switches.
+ //
+ // This is needed to handle the case a switch going down:
+ // it has no Master controller instance, hence no
+ // controller instance will cleanup its flow entries.
+ // This is sub-optimal: we need to elect a controller
+ // instance to handle the cleanup of such orphaned flow
+ // entries.
+ //
+ if (mySwitch == null) {
+ if (flowEntry.flowEntryUserState() !=
+ FlowEntryUserState.FE_USER_DELETE) {
+ continue;
+ }
+ if (! flowEntry.isValidFlowEntryId())
+ continue;
+ }
+
+ log.debug("Pushing Flow Entry To Datagrid: {}", flowEntry.toString());
+ //
+ // Write the Flow Entry to the Datagrid
+ //
+ switch (flowEntry.flowEntryUserState()) {
+ case FE_USER_ADD:
+ if (mySwitch == null)
+ break; // Install only flow entries for my switches
+ datagridService.notificationSendFlowEntryAdded(flowEntry);
+ break;
+ case FE_USER_MODIFY:
+ if (mySwitch == null)
+ break; // Install only flow entries for my switches
+ datagridService.notificationSendFlowEntryUpdated(flowEntry);
+ break;
+ case FE_USER_DELETE:
+ datagridService.notificationSendFlowEntryRemoved(flowEntry.flowEntryId());
+ break;
+ }
+ }
+ }
+
+ /**
+ * Class to implement writing to the database in a separate thread.
+ */
+ class FlowDatabaseWriter extends Thread {
+ private FlowManager flowManager;
+ private BlockingQueue<FlowPathEntryPair> blockingQueue;
+
+ /**
+ * Constructor.
+ *
+ * @param flowManager the Flow Manager to use.
+ * @param blockingQueue the blocking queue to use.
+ */
+ FlowDatabaseWriter(FlowManager flowManager,
+ BlockingQueue<FlowPathEntryPair> blockingQueue) {
+ this.flowManager = flowManager;
+ this.blockingQueue = blockingQueue;
+ }
+
+ /**
+ * Run the thread.
+ */
+ @Override
+ public void run() {
+ //
+ // The main loop
+ //
+ Collection<FlowPathEntryPair> collection =
+ new LinkedList<FlowPathEntryPair>();
+ try {
+ while (true) {
+ FlowPathEntryPair entryPair = blockingQueue.take();
+ collection.add(entryPair);
+ blockingQueue.drainTo(collection);
+ flowManager.writeModifiedFlowEntriesToDatabase(collection);
+ collection.clear();
+ }
+ } catch (Exception exception) {
+ log.debug("Exception writing to the Database: ", exception);
+ }
+ }
+ }
+
+ /**
+ * Push Flow Entries to the Network MAP.
+ *
+ * NOTE: The Flow Entries are pushed only on the instance responsible
+ * for the first switch. This is to avoid database errors when multiple
+ * instances are writing Flow Entries for the same Flow Path.
+ *
+ * @param modifiedFlowEntries the collection of Flow Entries to push.
+ */
+ void pushModifiedFlowEntriesToDatabase(
+ Collection<FlowPathEntryPair> modifiedFlowEntries) {
//
- // The installRemoteFlowEntry() method implements both installation
- // and removal of flow entries.
+ // We only add the Flow Entries to the Database Queue.
+ // The FlowDatabaseWriter thread is responsible for the actual writing.
//
- return (installRemoteFlowEntry(flowPath, flowEntry));
+ flowEntriesToDatabaseQueue.addAll(modifiedFlowEntries);
+ }
+
+ /**
+ * Write Flow Entries to the Network MAP.
+ *
+ * NOTE: The Flow Entries are written only on the instance responsible
+ * for the first switch. This is to avoid database errors when multiple
+ * instances are writing Flow Entries for the same Flow Path.
+ *
+ * @param modifiedFlowEntries the collection of Flow Entries to write.
+ */
+ private void writeModifiedFlowEntriesToDatabase(
+ Collection<FlowPathEntryPair> modifiedFlowEntries) {
+ if (modifiedFlowEntries.isEmpty())
+ return;
+
+ Map<Long, IOFSwitch> mySwitches = getMySwitches();
+
+ for (FlowPathEntryPair flowPair : modifiedFlowEntries) {
+ FlowPath flowPath = flowPair.flowPath;
+ FlowEntry flowEntry = flowPair.flowEntry;
+
+ if (! flowEntry.isValidFlowEntryId())
+ continue;
+
+ //
+ // Push the changes only on the instance responsible for the
+ // first switch.
+ //
+ Dpid srcDpid = flowPath.dataPath().srcPort().dpid();
+ IOFSwitch mySrcSwitch = mySwitches.get(srcDpid.value());
+ if (mySrcSwitch == null)
+ continue;
+
+ log.debug("Pushing Flow Entry To Database: {}", flowEntry.toString());
+ //
+ // Write the Flow Entry to the Network Map
+ //
+ // NOTE: We try a number of times, in case somehow some other
+ // instances are writing at the same time.
+ // Apparently, if other instances are writing at the same time
+ // this will trigger an error.
+ //
+ for (int i = 0; i < 6; i++) {
+ try {
+ //
+ // Find the Flow Path in the Network MAP.
+ //
+ // NOTE: The Flow Path might not be found if the Flow was
+ // just removed by some other controller instance.
+ //
+ IFlowPath flowObj =
+ dbHandlerInner.searchFlowPath(flowEntry.flowId());
+ if (flowObj == null) {
+ String logMsg = "Cannot find Network MAP entry for Flow Path " + flowEntry.flowId();
+ log.error(logMsg);
+ break;
+ }
+
+ // Write the Flow Entry
+ switch (flowEntry.flowEntryUserState()) {
+ case FE_USER_ADD:
+ // FALLTHROUGH
+ case FE_USER_MODIFY:
+ if (addFlowEntry(flowObj, flowEntry) == null) {
+ String logMsg = "Cannot write to Network MAP Flow Entry " +
+ flowEntry.flowEntryId() +
+ " from Flow Path " + flowEntry.flowId() +
+ " on switch " + flowEntry.dpid();
+ log.error(logMsg);
+ }
+ break;
+ case FE_USER_DELETE:
+ if (deleteFlowEntry(flowObj, flowEntry) == false) {
+ String logMsg = "Cannot remove from Network MAP Flow Entry " +
+ flowEntry.flowEntryId() +
+ " from Flow Path " + flowEntry.flowId() +
+ " on switch " + flowEntry.dpid();
+ log.error(logMsg);
+ }
+ break;
+ }
+
+ // Commit to the database
+ dbHandlerInner.commit();
+ break; // Success
+
+ } catch (Exception e) {
+ log.debug("Exception writing Flow Entry to Network MAP: ", e);
+ dbHandlerInner.rollback();
+ // Wait a bit (random value [1ms, 20ms] and try again
+ int delay = 1 + randomGenerator.nextInt() % 20;
+ try {
+ Thread.sleep(delay);
+ } catch (Exception e0) {
+ }
+ }
+ }
+ }
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java
new file mode 100644
index 0000000..8bed120
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowSwitchOperation.java
@@ -0,0 +1,689 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.util.MACAddress;
+import net.floodlightcontroller.util.OFMessageDamper;
+
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.util.*;
+import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
+
+import org.openflow.protocol.OFFlowMod;
+import org.openflow.protocol.OFMatch;
+import org.openflow.protocol.OFPacketOut;
+import org.openflow.protocol.OFPort;
+import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.*;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class for performing Flow-related operations on the Switch.
+ */
+class FlowSwitchOperation {
+ private final static Logger log = LoggerFactory.getLogger(FlowSwitchOperation.class);
+ //
+ // TODO: Values copied from elsewhere (class LearningSwitch).
+ // The local copy should go away!
+ //
+ public static final short PRIORITY_DEFAULT = 100;
+ public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
+ public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
+
+ // TODO add Pusher instance member
+ //
+
+ /**
+ * Install a Flow Entry on a switch.
+ *
+ * @param messageFactory the OpenFlow message factory to use.
+ * @param messageDamper the OpenFlow message damper to use.
+ * @param mySwitch the switch to install the Flow Entry into.
+ * @param flowObj the flow path object for the flow entry to install.
+ * @param flowEntryObj the flow entry object to install.
+ * @return true on success, otherwise false.
+ */
+ static boolean installFlowEntry(BasicFactory messageFactory,
+ OFMessageDamper messageDamper,
+ IOFSwitch mySwitch, IFlowPath flowObj,
+ IFlowEntry flowEntryObj) {
+ String flowEntryIdStr = flowEntryObj.getFlowEntryId();
+ if (flowEntryIdStr == null)
+ return false;
+ FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
+ String userState = flowEntryObj.getUserState();
+ if (userState == null)
+ return false;
+
+ //
+ // Create the Open Flow Flow Modification Entry to push
+ //
+ OFFlowMod fm = (OFFlowMod)messageFactory.getMessage(OFType.FLOW_MOD);
+ long cookie = flowEntryId.value();
+
+ short flowModCommand = OFFlowMod.OFPFC_ADD;
+ if (userState.equals("FE_USER_ADD")) {
+ flowModCommand = OFFlowMod.OFPFC_ADD;
+ } else if (userState.equals("FE_USER_MODIFY")) {
+ flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+ } else if (userState.equals("FE_USER_DELETE")) {
+ flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+ } else {
+ // Unknown user state. Ignore the entry
+ log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+ flowEntryId.toString(), userState);
+ return false;
+ }
+
+ //
+ // Fetch the match conditions.
+ //
+ // NOTE: The Flow matching conditions common for all Flow Entries are
+ // used ONLY if a Flow Entry does NOT have the corresponding matching
+ // condition set.
+ //
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+
+ // Match the Incoming Port
+ Short matchInPort = flowEntryObj.getMatchInPort();
+ if (matchInPort != null) {
+ match.setInputPort(matchInPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+ }
+
+ // Match the Source MAC address
+ String matchSrcMac = flowEntryObj.getMatchSrcMac();
+ if (matchSrcMac == null)
+ matchSrcMac = flowObj.getMatchSrcMac();
+ if (matchSrcMac != null) {
+ match.setDataLayerSource(matchSrcMac);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+ }
+
+ // Match the Destination MAC address
+ String matchDstMac = flowEntryObj.getMatchDstMac();
+ if (matchDstMac == null)
+ matchDstMac = flowObj.getMatchDstMac();
+ if (matchDstMac != null) {
+ match.setDataLayerDestination(matchDstMac);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+ }
+
+ // Match the Ethernet Frame Type
+ Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
+ if (matchEthernetFrameType == null)
+ matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
+ if (matchEthernetFrameType != null) {
+ match.setDataLayerType(matchEthernetFrameType);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+ }
+
+ // Match the VLAN ID
+ Short matchVlanId = flowEntryObj.getMatchVlanId();
+ if (matchVlanId == null)
+ matchVlanId = flowObj.getMatchVlanId();
+ if (matchVlanId != null) {
+ match.setDataLayerVirtualLan(matchVlanId);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+ }
+
+ // Match the VLAN priority
+ Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
+ if (matchVlanPriority == null)
+ matchVlanPriority = flowObj.getMatchVlanPriority();
+ if (matchVlanPriority != null) {
+ match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
+ }
+
+ // Match the Source IPv4 Network prefix
+ String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
+ if (matchSrcIPv4Net == null)
+ matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
+ if (matchSrcIPv4Net != null) {
+ match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
+ }
+
+ // Natch the Destination IPv4 Network prefix
+ String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
+ if (matchDstIPv4Net == null)
+ matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
+ if (matchDstIPv4Net != null) {
+ match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
+ }
+
+ // Match the IP protocol
+ Byte matchIpProto = flowEntryObj.getMatchIpProto();
+ if (matchIpProto == null)
+ matchIpProto = flowObj.getMatchIpProto();
+ if (matchIpProto != null) {
+ match.setNetworkProtocol(matchIpProto);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+ }
+
+ // Match the IP ToS (DSCP field, 6 bits)
+ Byte matchIpToS = flowEntryObj.getMatchIpToS();
+ if (matchIpToS == null)
+ matchIpToS = flowObj.getMatchIpToS();
+ if (matchIpToS != null) {
+ match.setNetworkTypeOfService(matchIpToS);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+ }
+
+ // Match the Source TCP/UDP port
+ Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
+ if (matchSrcTcpUdpPort == null)
+ matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
+ if (matchSrcTcpUdpPort != null) {
+ match.setTransportSource(matchSrcTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+ }
+
+ // Match the Destination TCP/UDP port
+ Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
+ if (matchDstTcpUdpPort == null)
+ matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
+ if (matchDstTcpUdpPort != null) {
+ match.setTransportDestination(matchDstTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+ }
+
+ //
+ // Fetch the actions
+ //
+ Short actionOutputPort = null;
+ List<OFAction> openFlowActions = new ArrayList<OFAction>();
+ int actionsLen = 0;
+ FlowEntryActions flowEntryActions = null;
+ String actionsStr = flowEntryObj.getActions();
+ if (actionsStr != null)
+ flowEntryActions = new FlowEntryActions(actionsStr);
+ else
+ flowEntryActions = new FlowEntryActions();
+ for (FlowEntryAction action : flowEntryActions.actions()) {
+ ActionOutput actionOutput = action.actionOutput();
+ ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+ ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
+ ActionStripVlan actionStripVlan = action.actionStripVlan();
+ ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
+ ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
+ ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
+ ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
+ ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+ ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
+ ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
+ ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+ if (actionOutput != null) {
+ actionOutputPort = actionOutput.port().value();
+ // XXX: The max length is hard-coded for now
+ OFActionOutput ofa =
+ new OFActionOutput(actionOutput.port().value(),
+ (short)0xffff);
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanId != null) {
+ OFActionVirtualLanIdentifier ofa =
+ new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanPriority != null) {
+ OFActionVirtualLanPriorityCodePoint ofa =
+ new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionStripVlan != null) {
+ if (actionStripVlan.stripVlan() == true) {
+ OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ if (actionSetEthernetSrcAddr != null) {
+ OFActionDataLayerSource ofa =
+ new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetEthernetDstAddr != null) {
+ OFActionDataLayerDestination ofa =
+ new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4SrcAddr != null) {
+ OFActionNetworkLayerSource ofa =
+ new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4DstAddr != null) {
+ OFActionNetworkLayerDestination ofa =
+ new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIpToS != null) {
+ OFActionNetworkTypeOfService ofa =
+ new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpSrcPort != null) {
+ OFActionTransportLayerSource ofa =
+ new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpDstPort != null) {
+ OFActionTransportLayerDestination ofa =
+ new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionEnqueue != null) {
+ OFActionEnqueue ofa =
+ new OFActionEnqueue(actionEnqueue.port().value(),
+ actionEnqueue.queueId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
+ .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
+ .setPriority(PRIORITY_DEFAULT)
+ .setBufferId(OFPacketOut.BUFFER_ID_NONE)
+ .setCookie(cookie)
+ .setCommand(flowModCommand)
+ .setMatch(match)
+ .setActions(openFlowActions)
+ .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+ fm.setOutPort(OFPort.OFPP_NONE.getValue());
+ if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
+ (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+ if (actionOutputPort != null)
+ fm.setOutPort(actionOutputPort);
+ }
+
+ //
+ // TODO: Set the following flag
+ // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+ // See method ForwardingBase::pushRoute()
+ //
+
+ //
+ // Write the message to the switch
+ //
+ log.debug("MEASUREMENT: Installing flow entry " + userState +
+ " into switch DPID: " +
+ mySwitch.getStringId() +
+ " flowEntryId: " + flowEntryId.toString() +
+ " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
+ " inPort: " + matchInPort + " outPort: " + actionOutputPort
+ );
+ try {
+ messageDamper.write(mySwitch, fm, null);
+ mySwitch.flush();
+ //
+ // TODO: We should use the OpenFlow Barrier mechanism
+ // to check for errors, and update the SwitchState
+ // for a flow entry after the Barrier message is
+ // is received.
+ //
+ flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
+ } catch (IOException e) {
+ log.error("Failure writing flow mod from network map", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Install a Flow Entry on a switch.
+ *
+ * @param messageFactory the OpenFlow message factory to use.
+ * @maram messageDamper the OpenFlow message damper to use.
+ * @param mySwitch the switch to install the Flow Entry into.
+ * @param flowPath the flow path for the flow entry to install.
+ * @param flowEntry the flow entry to install.
+ * @return true on success, otherwise false.
+ */
+ static boolean installFlowEntry(BasicFactory messageFactory,
+ OFMessageDamper messageDamper,
+ IOFSwitch mySwitch, FlowPath flowPath,
+ FlowEntry flowEntry) {
+ //
+ // Create the OpenFlow Flow Modification Entry to push
+ //
+ OFFlowMod fm = (OFFlowMod)messageFactory.getMessage(OFType.FLOW_MOD);
+ long cookie = flowEntry.flowEntryId().value();
+
+ short flowModCommand = OFFlowMod.OFPFC_ADD;
+ if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
+ flowModCommand = OFFlowMod.OFPFC_ADD;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
+ flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
+ flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+ } else {
+ // Unknown user state. Ignore the entry
+ log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+ flowEntry.flowEntryId().toString(),
+ flowEntry.flowEntryUserState());
+ return false;
+ }
+
+ //
+ // Fetch the match conditions.
+ //
+ // NOTE: The Flow matching conditions common for all Flow Entries are
+ // used ONLY if a Flow Entry does NOT have the corresponding matching
+ // condition set.
+ //
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+ FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
+ FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+
+ // Match the Incoming Port
+ Port matchInPort = flowEntryMatch.inPort();
+ if (matchInPort != null) {
+ match.setInputPort(matchInPort.value());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+ }
+
+ // Match the Source MAC address
+ MACAddress matchSrcMac = flowEntryMatch.srcMac();
+ if ((matchSrcMac == null) && (flowPathMatch != null)) {
+ matchSrcMac = flowPathMatch.srcMac();
+ }
+ if (matchSrcMac != null) {
+ match.setDataLayerSource(matchSrcMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+ }
+
+ // Match the Destination MAC address
+ MACAddress matchDstMac = flowEntryMatch.dstMac();
+ if ((matchDstMac == null) && (flowPathMatch != null)) {
+ matchDstMac = flowPathMatch.dstMac();
+ }
+ if (matchDstMac != null) {
+ match.setDataLayerDestination(matchDstMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+ }
+
+ // Match the Ethernet Frame Type
+ Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
+ if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
+ matchEthernetFrameType = flowPathMatch.ethernetFrameType();
+ }
+ if (matchEthernetFrameType != null) {
+ match.setDataLayerType(matchEthernetFrameType);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+ }
+
+ // Match the VLAN ID
+ Short matchVlanId = flowEntryMatch.vlanId();
+ if ((matchVlanId == null) && (flowPathMatch != null)) {
+ matchVlanId = flowPathMatch.vlanId();
+ }
+ if (matchVlanId != null) {
+ match.setDataLayerVirtualLan(matchVlanId);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+ }
+
+ // Match the VLAN priority
+ Byte matchVlanPriority = flowEntryMatch.vlanPriority();
+ if ((matchVlanPriority == null) && (flowPathMatch != null)) {
+ matchVlanPriority = flowPathMatch.vlanPriority();
+ }
+ if (matchVlanPriority != null) {
+ match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
+ }
+
+ // Match the Source IPv4 Network prefix
+ IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
+ if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
+ matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
+ }
+ if (matchSrcIPv4Net != null) {
+ match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
+ }
+
+ // Natch the Destination IPv4 Network prefix
+ IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
+ if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
+ matchDstIPv4Net = flowPathMatch.dstIPv4Net();
+ }
+ if (matchDstIPv4Net != null) {
+ match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
+ }
+
+ // Match the IP protocol
+ Byte matchIpProto = flowEntryMatch.ipProto();
+ if ((matchIpProto == null) && (flowPathMatch != null)) {
+ matchIpProto = flowPathMatch.ipProto();
+ }
+ if (matchIpProto != null) {
+ match.setNetworkProtocol(matchIpProto);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+ }
+
+ // Match the IP ToS (DSCP field, 6 bits)
+ Byte matchIpToS = flowEntryMatch.ipToS();
+ if ((matchIpToS == null) && (flowPathMatch != null)) {
+ matchIpToS = flowPathMatch.ipToS();
+ }
+ if (matchIpToS != null) {
+ match.setNetworkTypeOfService(matchIpToS);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+ }
+
+ // Match the Source TCP/UDP port
+ Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
+ if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
+ matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
+ }
+ if (matchSrcTcpUdpPort != null) {
+ match.setTransportSource(matchSrcTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+ }
+
+ // Match the Destination TCP/UDP port
+ Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
+ if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
+ matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
+ }
+ if (matchDstTcpUdpPort != null) {
+ match.setTransportDestination(matchDstTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+ }
+
+ //
+ // Fetch the actions
+ //
+ Short actionOutputPort = null;
+ List<OFAction> openFlowActions = new ArrayList<OFAction>();
+ int actionsLen = 0;
+ FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
+ //
+ for (FlowEntryAction action : flowEntryActions.actions()) {
+ ActionOutput actionOutput = action.actionOutput();
+ ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+ ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
+ ActionStripVlan actionStripVlan = action.actionStripVlan();
+ ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
+ ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
+ ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
+ ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
+ ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+ ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
+ ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
+ ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+ if (actionOutput != null) {
+ actionOutputPort = actionOutput.port().value();
+ // XXX: The max length is hard-coded for now
+ OFActionOutput ofa =
+ new OFActionOutput(actionOutput.port().value(),
+ (short)0xffff);
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanId != null) {
+ OFActionVirtualLanIdentifier ofa =
+ new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanPriority != null) {
+ OFActionVirtualLanPriorityCodePoint ofa =
+ new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionStripVlan != null) {
+ if (actionStripVlan.stripVlan() == true) {
+ OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ if (actionSetEthernetSrcAddr != null) {
+ OFActionDataLayerSource ofa =
+ new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetEthernetDstAddr != null) {
+ OFActionDataLayerDestination ofa =
+ new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4SrcAddr != null) {
+ OFActionNetworkLayerSource ofa =
+ new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4DstAddr != null) {
+ OFActionNetworkLayerDestination ofa =
+ new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIpToS != null) {
+ OFActionNetworkTypeOfService ofa =
+ new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpSrcPort != null) {
+ OFActionTransportLayerSource ofa =
+ new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpDstPort != null) {
+ OFActionTransportLayerDestination ofa =
+ new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionEnqueue != null) {
+ OFActionEnqueue ofa =
+ new OFActionEnqueue(actionEnqueue.port().value(),
+ actionEnqueue.queueId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
+ .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
+ .setPriority(PRIORITY_DEFAULT)
+ .setBufferId(OFPacketOut.BUFFER_ID_NONE)
+ .setCookie(cookie)
+ .setCommand(flowModCommand)
+ .setMatch(match)
+ .setActions(openFlowActions)
+ .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+ fm.setOutPort(OFPort.OFPP_NONE.getValue());
+ if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
+ (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+ if (actionOutputPort != null)
+ fm.setOutPort(actionOutputPort);
+ }
+
+ //
+ // TODO: Set the following flag
+ // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+ // See method ForwardingBase::pushRoute()
+ //
+
+ //
+ // Write the message to the switch
+ //
+ log.debug("MEASUREMENT: Installing flow entry " +
+ flowEntry.flowEntryUserState() +
+ " into switch DPID: " +
+ mySwitch.getStringId() +
+ " flowEntryId: " + flowEntry.flowEntryId().toString() +
+ " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
+ " inPort: " + matchInPort + " outPort: " + actionOutputPort
+ );
+ try {
+ messageDamper.write(mySwitch, fm, null);
+ mySwitch.flush();
+ //
+ // TODO: We should use the OpenFlow Barrier mechanism
+ // to check for errors, and update the SwitchState
+ // for a flow entry after the Barrier message is
+ // is received.
+ //
+ // TODO: The FlowEntry Object in Titan should be set
+ // to FE_SWITCH_UPDATED.
+ //
+ } catch (IOException e) {
+ log.error("Failure writing flow mod from network map", e);
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
new file mode 100644
index 0000000..78562e1
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowEventHandlerService.java
@@ -0,0 +1,73 @@
+package net.onrc.onos.ofcontroller.flowmanager;
+
+import net.onrc.onos.ofcontroller.topology.TopologyElement;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+
+/**
+ * Interface for providing Flow Event Handler Service to other modules.
+ */
+public interface IFlowEventHandlerService {
+ /**
+ * Receive a notification that a Flow is added.
+ *
+ * @param flowPath the Flow that is added.
+ */
+ void notificationRecvFlowAdded(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a Flow is removed.
+ *
+ * @param flowPath the Flow that is removed.
+ */
+ void notificationRecvFlowRemoved(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a Flow is updated.
+ *
+ * @param flowPath the Flow that is updated.
+ */
+ void notificationRecvFlowUpdated(FlowPath flowPath);
+
+ /**
+ * Receive a notification that a FlowEntry is added.
+ *
+ * @param flowEntry the FlowEntry that is added.
+ */
+ void notificationRecvFlowEntryAdded(FlowEntry flowEntry);
+
+ /**
+ * Receive a notification that a FlowEntry is removed.
+ *
+ * @param flowEntry the FlowEntry that is removed.
+ */
+ void notificationRecvFlowEntryRemoved(FlowEntry flowEntry);
+
+ /**
+ * Receive a notification that a FlowEntry is updated.
+ *
+ * @param flowEntry the FlowEntry that is updated.
+ */
+ void notificationRecvFlowEntryUpdated(FlowEntry flowEntry);
+
+ /**
+ * Receive a notification that a Topology Element is added.
+ *
+ * @param topologyElement the Topology Element that is added.
+ */
+ void notificationRecvTopologyElementAdded(TopologyElement topologyElement);
+
+ /**
+ * Receive a notification that a Topology Element is removed.
+ *
+ * @param topologyElement the Topology Element that is removed.
+ */
+ void notificationRecvTopologyElementRemoved(TopologyElement topologyElement);
+
+ /**
+ * Receive a notification that a Topology Element is updated.
+ *
+ * @param topologyElement the Topology Element that is updated.
+ */
+ void notificationRecvTopologyElementUpdated(TopologyElement topologyElement);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
index df11d6b..b06d844 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/IFlowService.java
@@ -4,6 +4,7 @@
import net.floodlightcontroller.core.module.IFloodlightService;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
+import net.onrc.onos.ofcontroller.topology.Topology;
import net.onrc.onos.ofcontroller.util.CallerId;
import net.onrc.onos.ofcontroller.util.DataPathEndpoints;
import net.onrc.onos.ofcontroller.util.FlowId;
@@ -21,12 +22,9 @@
*
* @param flowPath the Flow Path to install.
* @param flowId the return-by-reference Flow ID as assigned internally.
- * @param dataPathSummaryStr the data path summary string if the added
- * flow will be maintained internally, otherwise null.
* @return true on success, otherwise false.
*/
- boolean addFlow(FlowPath flowPath, FlowId flowId,
- String dataPathSummaryStr);
+ boolean addFlow(FlowPath flowPath, FlowId flowId);
/**
* Delete all previously added flows.
@@ -44,21 +42,6 @@
boolean deleteFlow(FlowId flowId);
/**
- * Clear the state for all previously added flows.
- *
- * @return true on success, otherwise false.
- */
- boolean clearAllFlows();
-
- /**
- * Clear the state for a previously added flow.
- *
- * @param flowId the Flow ID of the flow to clear.
- * @return true on success, otherwise false.
- */
- boolean clearFlow(FlowId flowId);
-
- /**
* Get a previously added flow.
*
* @param flowId the Flow ID of the flow to get.
@@ -67,6 +50,13 @@
FlowPath getFlow(FlowId flowId);
/**
+ * Get all installed flows by all installers.
+ *
+ * @return the Flow Paths if found, otherwise null.
+ */
+ ArrayList<FlowPath> getAllFlows();
+
+ /**
* Get all previously added flows by a specific installer for a given
* data path endpoints.
*
@@ -95,13 +85,6 @@
ArrayList<IFlowPath> getAllFlowsSummary(FlowId flowId, int maxFlows);
/**
- * Get all installed flows by all installers.
- *
- * @return the Flow Paths if found, otherwise null.
- */
- ArrayList<FlowPath> getAllFlows();
-
- /**
* Add and maintain a shortest-path flow.
*
* NOTE: The Flow Path argument does NOT contain all flow entries.
@@ -114,5 +97,20 @@
* conditions to install.
* @return the added shortest-path flow on success, otherwise null.
*/
- public FlowPath addAndMaintainShortestPathFlow(FlowPath flowPath);
+ FlowPath addAndMaintainShortestPathFlow(FlowPath flowPath);
+
+ /**
+ * Get the network topology.
+ *
+ * @return the network topology.
+ */
+ Topology getTopology();
+
+ /**
+ * Get a globally unique flow ID from the flow service.
+ * NOTE: Not currently guaranteed to be globally unique.
+ *
+ * @return unique flow ID
+ */
+ public long getNextFlowEntryId();
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java
index 7464ec5..0926f91 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddFlowResource.java
@@ -22,7 +22,7 @@
*/
public class AddFlowResource extends ServerResource {
- protected static Logger log = LoggerFactory.getLogger(AddFlowResource.class);
+ protected final static Logger log = LoggerFactory.getLogger(AddFlowResource.class);
/**
* Implement the API.
@@ -64,7 +64,7 @@
// Process the request
if (flowPath != null) {
- if (flowService.addFlow(flowPath, result, null) != true) {
+ if (flowService.addFlow(flowPath, result) != true) {
result = new FlowId(); // Error: Return empty Flow Id
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddShortestPathFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddShortestPathFlowResource.java
index c454b0f..7a4e88c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddShortestPathFlowResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/AddShortestPathFlowResource.java
@@ -22,7 +22,7 @@
*/
public class AddShortestPathFlowResource extends ServerResource {
- protected static Logger log = LoggerFactory.getLogger(AddShortestPathFlowResource.class);
+ protected final static Logger log = LoggerFactory.getLogger(AddShortestPathFlowResource.class);
/**
* Implement the API.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/ClearFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/ClearFlowResource.java
deleted file mode 100644
index db591ae..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/ClearFlowResource.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package net.onrc.onos.ofcontroller.flowmanager.web;
-
-import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
-import net.onrc.onos.ofcontroller.util.FlowId;
-
-import org.restlet.resource.Get;
-import org.restlet.resource.ServerResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Flow Manager REST API implementation: Clear internal Flow state.
- *
- * The "{flow-id}" request attribute value can be either a specific Flow ID,
- * or the keyword "all" to clear all Flows:
- *
- * GET /wm/flow/clear/{flow-id}/json
- */
-public class ClearFlowResource extends ServerResource {
- protected static Logger log = LoggerFactory.getLogger(ClearFlowResource.class);
-
- /**
- * Implement the API.
- *
- * @return true on success, otehrwise false.
- */
- @Get("json")
- public Boolean retrieve() {
- Boolean result = false;
-
- IFlowService flowService =
- (IFlowService)getContext().getAttributes().
- get(IFlowService.class.getCanonicalName());
-
- if (flowService == null) {
- log.debug("ONOS Flow Service not found");
- return result;
- }
-
- // Extract the arguments
- String flowIdStr = (String) getRequestAttributes().get("flow-id");
-
- // Process the request
- if (flowIdStr.equals("all")) {
- log.debug("Clear All Flows");
- result = flowService.clearAllFlows();
- } else {
- FlowId flowId = new FlowId(flowIdStr);
- log.debug("Clear Flow Id: " + flowIdStr);
- result = flowService.clearFlow(flowId);
- }
- return result;
- }
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DatapathSummarySerializer.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DatapathSummarySerializer.java
index 9133077..5fa472f 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DatapathSummarySerializer.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DatapathSummarySerializer.java
@@ -10,7 +10,7 @@
import org.slf4j.LoggerFactory;
public class DatapathSummarySerializer extends JsonSerializer<String>{
- static Logger log = LoggerFactory.getLogger(DatapathSummarySerializer.class);
+ protected final static Logger log = LoggerFactory.getLogger(DatapathSummarySerializer.class);
@Override
public void serialize(String datapathSummary, JsonGenerator jGen,
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DeleteFlowResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DeleteFlowResource.java
index 5b2bad4..f4e23b8 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DeleteFlowResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/DeleteFlowResource.java
@@ -17,7 +17,7 @@
* GET /wm/flow/delete/{flow-id}/json
*/
public class DeleteFlowResource extends ServerResource {
- protected static Logger log = LoggerFactory.getLogger(DeleteFlowResource.class);
+ protected final static Logger log = LoggerFactory.getLogger(DeleteFlowResource.class);
/**
* Implement the API.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
index e027270..81d26dd 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/FlowWebRoutable.java
@@ -18,7 +18,6 @@
Router router = new Router(context);
router.attach("/add/json", AddFlowResource.class);
router.attach("/add-shortest-path/json", AddShortestPathFlowResource.class);
- router.attach("/clear/{flow-id}/json", ClearFlowResource.class);
router.attach("/delete/{flow-id}/json", DeleteFlowResource.class);
router.attach("/get/{flow-id}/json", GetFlowByIdResource.class);
router.attach("/getall-by-installer-id/{installer-id}/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json", GetAllFlowsByInstallerIdResource.class);
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByEndpointsResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByEndpointsResource.java
index 0308e89..1ac98c0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByEndpointsResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByEndpointsResource.java
@@ -30,7 +30,7 @@
* GET /wm/flow/getall-by-endpoints/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json"
*/
public class GetAllFlowsByEndpointsResource extends ServerResource {
- protected static Logger log = LoggerFactory.getLogger(GetAllFlowsByEndpointsResource.class);
+ protected final static Logger log = LoggerFactory.getLogger(GetAllFlowsByEndpointsResource.class);
/**
* Implement the API.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByInstallerIdResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByInstallerIdResource.java
index 43f1618..870548e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByInstallerIdResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsByInstallerIdResource.java
@@ -33,7 +33,7 @@
* GET /wm/flow/getall-by-installer-id/{installer-id}/{src-dpid}/{src-port}/{dst-dpid}/{dst-port}/json"
*/
public class GetAllFlowsByInstallerIdResource extends ServerResource {
- protected static Logger log = LoggerFactory.getLogger(GetAllFlowsByInstallerIdResource.class);
+ protected final static Logger log = LoggerFactory.getLogger(GetAllFlowsByInstallerIdResource.class);
/**
* Implement the API.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsResource.java
index ab88348..a4ea960 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetAllFlowsResource.java
@@ -16,7 +16,7 @@
* GET /wm/flow/getall/json"
*/
public class GetAllFlowsResource extends ServerResource {
- protected static Logger log = LoggerFactory.getLogger(GetAllFlowsResource.class);
+ protected final static Logger log = LoggerFactory.getLogger(GetAllFlowsResource.class);
/**
* Implement the API.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
index 77a898c..9ceef6e 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetFlowByIdResource.java
@@ -17,7 +17,7 @@
* GET /wm/flow/get/{flow-id}/json
*/
public class GetFlowByIdResource extends ServerResource {
- protected static Logger log = LoggerFactory.getLogger(GetFlowByIdResource.class);
+ protected final static Logger log = LoggerFactory.getLogger(GetFlowByIdResource.class);
/**
* Implement the API.
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
index 67e1ad6..89e5b01 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/web/GetSummaryFlowsResource.java
@@ -23,7 +23,7 @@
* GET /wm/flow/getsummary/{flow-id}/{max-flows}/json"
*/
public class GetSummaryFlowsResource extends ServerResource {
- protected static Logger log = LoggerFactory.getLogger(GetSummaryFlowsResource.class);
+ protected final static Logger log = LoggerFactory.getLogger(GetSummaryFlowsResource.class);
/**
* Implement the API.