Added FlowPath/FlowEntry version interfaces to FlowPusher
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
index a3f602f..0e70a9b 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowManager.java
@@ -785,7 +785,7 @@
*/
private boolean installFlowEntry(IOFSwitch mySwitch, IFlowPath flowObj,
IFlowEntry flowEntryObj) {
- return pusher.send(mySwitch, flowObj, flowEntryObj);
+ return pusher.add(mySwitch, flowObj, flowEntryObj);
}
/**
@@ -798,14 +798,7 @@
*/
private boolean installFlowEntry(IOFSwitch mySwitch, FlowPath flowPath,
FlowEntry flowEntry) {
- log.debug("Flow is sent to pusher : dpid({}) flow_id({})", mySwitch.getId(), flowEntry.getFlowId());
- // TODO handle this installation by FlowPusher
-
-// return FlowSwitchOperation.installFlowEntry(
-// floodlightProvider.getOFMessageFactory(),
-// messageDamper, mySwitch, flowPath, flowEntry);
-
- return true;
+ return pusher.add(mySwitch, flowPath, flowEntry);
}
/**
@@ -920,7 +913,7 @@
return;
}
- pusher.send(sw, msg);
+ pusher.add(sw, msg);
}
@Override
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
index 67d84e2..eef60a1 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/FlowPusher.java
@@ -17,13 +17,20 @@
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
import net.onrc.onos.ofcontroller.util.FlowEntryAction;
import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
+import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryActions;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
+import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
+import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
+import net.onrc.onos.ofcontroller.util.FlowPath;
+import net.onrc.onos.ofcontroller.util.IPv4Net;
+import net.onrc.onos.ofcontroller.util.Port;
/**
* FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
@@ -81,9 +88,9 @@
}
}
- void updateRate(long current, OFMessage msg) {
+ void logSentData(long current, long size) {
last_sent_time = current;
- last_sent_size = msg.getLengthU();
+ last_sent_size = size;
}
}
@@ -127,33 +134,32 @@
// check sending rate and determine it to be sent or not
long current_time = System.nanoTime();
+ long size = 0;
synchronized (queue) {
if (queue.isSendable(current_time)) {
+ // TODO limit number of messages to be sent at once
while (! queue.isEmpty()) {
OFMessage msg = queue.poll();
// if need to send, call IOFSwitch#write()
try {
messageDamper.write(sw, msg, context);
- queue.updateRate(current_time, msg);
+ size += msg.getLength();
log.debug("Pusher sends message : {}", msg);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
+ queue.logSentData(current_time, size);
}
}
+ sw.flush();
}
// sleep while all queues are empty
- while (! isMsgAdded) {
- if (isStopped) {
- log.debug("Pusher Process finished.");
- return;
- }
-
+ while (! (isMsgAdded || isStopped)) {
try {
Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
} catch (InterruptedException e) {
@@ -275,7 +281,7 @@
* @param sw
* @param msg
*/
- public boolean send(IOFSwitch sw, OFMessage msg) {
+ public boolean add(IOFSwitch sw, OFMessage msg) {
SwitchQueue queue = getQueue(sw);
if (queue == null) {
queue = new SwitchQueue();
@@ -303,7 +309,7 @@
* @param flowEntryObj
* @return
*/
- public boolean send(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
+ public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
log.debug("sending : {}, {}", sw, flowObj);
String flowEntryIdStr = flowEntryObj.getFlowEntryId();
if (flowEntryIdStr == null)
@@ -595,7 +601,7 @@
" srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
" inPort: " + matchInPort + " outPort: " + actionOutputPort
);
- send(sw,fm);
+ add(sw,fm);
//
// TODO: We should use the OpenFlow Barrier mechanism
// to check for errors, and update the SwitchState
@@ -607,6 +613,317 @@
return true;
}
+ public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
+ //
+ // Create the OpenFlow Flow Modification Entry to push
+ //
+ OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+ long cookie = flowEntry.flowEntryId().value();
+
+ short flowModCommand = OFFlowMod.OFPFC_ADD;
+ if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
+ flowModCommand = OFFlowMod.OFPFC_ADD;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
+ flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
+ flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+ } else {
+ // Unknown user state. Ignore the entry
+ log.debug(
+ "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+ flowEntry.flowEntryId().toString(),
+ flowEntry.flowEntryUserState());
+ return false;
+ }
+
+ //
+ // Fetch the match conditions.
+ //
+ // NOTE: The Flow matching conditions common for all Flow Entries are
+ // used ONLY if a Flow Entry does NOT have the corresponding matching
+ // condition set.
+ //
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+ FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
+ FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+
+ // Match the Incoming Port
+ Port matchInPort = flowEntryMatch.inPort();
+ if (matchInPort != null) {
+ match.setInputPort(matchInPort.value());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+ }
+
+ // Match the Source MAC address
+ MACAddress matchSrcMac = flowEntryMatch.srcMac();
+ if ((matchSrcMac == null) && (flowPathMatch != null)) {
+ matchSrcMac = flowPathMatch.srcMac();
+ }
+ if (matchSrcMac != null) {
+ match.setDataLayerSource(matchSrcMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+ }
+
+ // Match the Destination MAC address
+ MACAddress matchDstMac = flowEntryMatch.dstMac();
+ if ((matchDstMac == null) && (flowPathMatch != null)) {
+ matchDstMac = flowPathMatch.dstMac();
+ }
+ if (matchDstMac != null) {
+ match.setDataLayerDestination(matchDstMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+ }
+
+ // Match the Ethernet Frame Type
+ Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
+ if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
+ matchEthernetFrameType = flowPathMatch.ethernetFrameType();
+ }
+ if (matchEthernetFrameType != null) {
+ match.setDataLayerType(matchEthernetFrameType);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+ }
+
+ // Match the VLAN ID
+ Short matchVlanId = flowEntryMatch.vlanId();
+ if ((matchVlanId == null) && (flowPathMatch != null)) {
+ matchVlanId = flowPathMatch.vlanId();
+ }
+ if (matchVlanId != null) {
+ match.setDataLayerVirtualLan(matchVlanId);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+ }
+
+ // Match the VLAN priority
+ Byte matchVlanPriority = flowEntryMatch.vlanPriority();
+ if ((matchVlanPriority == null) && (flowPathMatch != null)) {
+ matchVlanPriority = flowPathMatch.vlanPriority();
+ }
+ if (matchVlanPriority != null) {
+ match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+ match.setWildcards(match.getWildcards()
+ & ~OFMatch.OFPFW_DL_VLAN_PCP);
+ }
+
+ // Match the Source IPv4 Network prefix
+ IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
+ if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
+ matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
+ }
+ if (matchSrcIPv4Net != null) {
+ match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
+ }
+
+ // Natch the Destination IPv4 Network prefix
+ IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
+ if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
+ matchDstIPv4Net = flowPathMatch.dstIPv4Net();
+ }
+ if (matchDstIPv4Net != null) {
+ match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
+ }
+
+ // Match the IP protocol
+ Byte matchIpProto = flowEntryMatch.ipProto();
+ if ((matchIpProto == null) && (flowPathMatch != null)) {
+ matchIpProto = flowPathMatch.ipProto();
+ }
+ if (matchIpProto != null) {
+ match.setNetworkProtocol(matchIpProto);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+ }
+
+ // Match the IP ToS (DSCP field, 6 bits)
+ Byte matchIpToS = flowEntryMatch.ipToS();
+ if ((matchIpToS == null) && (flowPathMatch != null)) {
+ matchIpToS = flowPathMatch.ipToS();
+ }
+ if (matchIpToS != null) {
+ match.setNetworkTypeOfService(matchIpToS);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+ }
+
+ // Match the Source TCP/UDP port
+ Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
+ if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
+ matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
+ }
+ if (matchSrcTcpUdpPort != null) {
+ match.setTransportSource(matchSrcTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+ }
+
+ // Match the Destination TCP/UDP port
+ Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
+ if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
+ matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
+ }
+ if (matchDstTcpUdpPort != null) {
+ match.setTransportDestination(matchDstTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+ }
+
+ //
+ // Fetch the actions
+ //
+ Short actionOutputPort = null;
+ List<OFAction> openFlowActions = new ArrayList<OFAction>();
+ int actionsLen = 0;
+ FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
+ //
+ for (FlowEntryAction action : flowEntryActions.actions()) {
+ ActionOutput actionOutput = action.actionOutput();
+ ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+ ActionSetVlanPriority actionSetVlanPriority = action
+ .actionSetVlanPriority();
+ ActionStripVlan actionStripVlan = action.actionStripVlan();
+ ActionSetEthernetAddr actionSetEthernetSrcAddr = action
+ .actionSetEthernetSrcAddr();
+ ActionSetEthernetAddr actionSetEthernetDstAddr = action
+ .actionSetEthernetDstAddr();
+ ActionSetIPv4Addr actionSetIPv4SrcAddr = action
+ .actionSetIPv4SrcAddr();
+ ActionSetIPv4Addr actionSetIPv4DstAddr = action
+ .actionSetIPv4DstAddr();
+ ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+ ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
+ .actionSetTcpUdpSrcPort();
+ ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
+ .actionSetTcpUdpDstPort();
+ ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+ if (actionOutput != null) {
+ actionOutputPort = actionOutput.port().value();
+ // XXX: The max length is hard-coded for now
+ OFActionOutput ofa = new OFActionOutput(actionOutput.port()
+ .value(), (short) 0xffff);
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanId != null) {
+ OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
+ actionSetVlanId.vlanId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanPriority != null) {
+ OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
+ actionSetVlanPriority.vlanPriority());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionStripVlan != null) {
+ if (actionStripVlan.stripVlan() == true) {
+ OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ if (actionSetEthernetSrcAddr != null) {
+ OFActionDataLayerSource ofa = new OFActionDataLayerSource(
+ actionSetEthernetSrcAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetEthernetDstAddr != null) {
+ OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
+ actionSetEthernetDstAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4SrcAddr != null) {
+ OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
+ actionSetIPv4SrcAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4DstAddr != null) {
+ OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
+ actionSetIPv4DstAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIpToS != null) {
+ OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
+ actionSetIpToS.ipToS());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpSrcPort != null) {
+ OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
+ actionSetTcpUdpSrcPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpDstPort != null) {
+ OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
+ actionSetTcpUdpDstPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionEnqueue != null) {
+ OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
+ .value(), actionEnqueue.queueId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
+ .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
+ .setPriority(PRIORITY_DEFAULT)
+ .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
+ .setCommand(flowModCommand).setMatch(match)
+ .setActions(openFlowActions)
+ .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+ fm.setOutPort(OFPort.OFPP_NONE.getValue());
+ if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
+ || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+ if (actionOutputPort != null)
+ fm.setOutPort(actionOutputPort);
+ }
+
+ //
+ // TODO: Set the following flag
+ // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+ // See method ForwardingBase::pushRoute()
+ //
+
+ //
+ // Write the message to the switch
+ //
+ log.debug("MEASUREMENT: Installing flow entry "
+ + flowEntry.flowEntryUserState() + " into switch DPID: "
+ + sw.getStringId() + " flowEntryId: "
+ + flowEntry.flowEntryId().toString() + " srcMac: "
+ + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
+ + matchInPort + " outPort: " + actionOutputPort);
+
+ //
+ // TODO: We should use the OpenFlow Barrier mechanism
+ // to check for errors, and update the SwitchState
+ // for a flow entry after the Barrier message is
+ // is received.
+ //
+ // TODO: The FlowEntry Object in Titan should be set
+ // to FE_SWITCH_UPDATED.
+ //
+
+ return add(sw,fm);
+ }
+
private SwitchQueue getQueue(IOFSwitch sw) {
if (sw == null) {
return null;