Giant patch of changes to support OpenFlow 1.3
The following people have contributed to this patch:
- Ali Al-Shabibi <alshabibi.ali@gmail.com>
- Ayaka Koshibe <ayaka@onlab.us>
- Brian O'Connor <bocon@onlab.us>
- Jonathan Hart <jono@onlab.us>
- Matteo Gerola <mgerola@create-net.org>
- Michele Santuari <michele.santuari@create-net.org>
- Pavlin Radoslavov <pavlin@onlab.us>
- Saurav Das <sauravdas@alumni.stanford.edu>
- Toshio Koide <t-koide@onlab.us>
- Yuta HIGUCHI <y-higuchi@onlab.us>
The patch includes the following changes:
- New Floodlight I/O loop / state machine
- New switch/port handling
- New role management (incl. Role.EQUAL)
- Added Floodlight debug framework
- Updates to Controller.java
- Move to Loxigen's OpenflowJ library
- Added OF1.3 support
- Added support for different switches (via DriverManager)
- Updated ONOS modules to use new APIs
- Added and updated unit tests
Change-Id: Ic70a8d50f7136946193d2ba2e4dc0b4bfac5f599
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
index 37e1b44..17bbc1a 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
@@ -4,7 +4,6 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -25,48 +24,17 @@
import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.threadpool.IThreadPoolService;
-import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.core.util.FlowEntry;
-import net.onrc.onos.core.util.FlowEntryAction;
-import net.onrc.onos.core.util.FlowEntryAction.ActionEnqueue;
-import net.onrc.onos.core.util.FlowEntryAction.ActionOutput;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetEthernetAddr;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetIPv4Addr;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetIpToS;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetTcpUdpPort;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanId;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanPriority;
-import net.onrc.onos.core.util.FlowEntryAction.ActionStripVlan;
-import net.onrc.onos.core.util.FlowEntryActions;
-import net.onrc.onos.core.util.FlowEntryMatch;
-import net.onrc.onos.core.util.FlowEntryUserState;
-import net.onrc.onos.core.util.IPv4Net;
+import net.onrc.onos.core.intent.FlowEntry;
import net.onrc.onos.core.util.Pair;
-import net.onrc.onos.core.util.PortNumber;
-import org.openflow.protocol.OFBarrierReply;
-import org.openflow.protocol.OFBarrierRequest;
-import org.openflow.protocol.OFFlowMod;
-import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPacketOut;
-import org.openflow.protocol.OFPort;
-import org.openflow.protocol.OFType;
-import org.openflow.protocol.action.OFAction;
-import org.openflow.protocol.action.OFActionDataLayerDestination;
-import org.openflow.protocol.action.OFActionDataLayerSource;
-import org.openflow.protocol.action.OFActionEnqueue;
-import org.openflow.protocol.action.OFActionNetworkLayerDestination;
-import org.openflow.protocol.action.OFActionNetworkLayerSource;
-import org.openflow.protocol.action.OFActionNetworkTypeOfService;
-import org.openflow.protocol.action.OFActionOutput;
-import org.openflow.protocol.action.OFActionStripVirtualLan;
-import org.openflow.protocol.action.OFActionTransportLayerDestination;
-import org.openflow.protocol.action.OFActionTransportLayerSource;
-import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
-import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
-import org.openflow.protocol.factory.BasicFactory;
+import org.projectfloodlight.openflow.protocol.OFBarrierReply;
+import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
+import org.projectfloodlight.openflow.protocol.OFFactory;
+import org.projectfloodlight.openflow.protocol.OFFlowMod;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFType;
+import org.projectfloodlight.openflow.protocol.OFVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,14 +43,13 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
- * FlowPusher is a implementation of FlowPusherService.
- * FlowPusher assigns one message queue instance for each one switch.
- * Number of message processing threads is configurable by constructor, and
- * one thread can handle multiple message queues. Each queue will be assigned to
- * a thread according to hash function defined by getHash().
- * Each processing thread reads messages from queues and sends it to switches
- * in round-robin. Processing thread also calculates rate of sending to suppress
- * excessive message sending.
+ * FlowPusher is a implementation of FlowPusherService. FlowPusher assigns one
+ * message queue instance for each one switch. Number of message processing
+ * threads is configurable by constructor, and one thread can handle multiple
+ * message queues. Each queue will be assigned to a thread according to hash
+ * function defined by getHash(). Each processing thread reads messages from
+ * queues and sends it to switches in round-robin. Processing thread also
+ * calculates rate of sending to suppress excessive message sending.
*/
public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
@@ -91,8 +58,9 @@
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
//
- protected static final int OFMESSAGE_DAMPER_CAPACITY = 10000; // TODO: find sweet spot
- protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
+ protected static final int OFMESSAGE_DAMPER_CAPACITY = 10000; // TODO: find
+ // sweet spot
+ protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
// Number of messages sent to switch at once
protected static final int MAX_MESSAGE_SEND = 100;
@@ -110,15 +78,15 @@
}
/**
- * SwitchQueue represents message queue attached to a switch.
- * This consists of queue itself and variables used for limiting sending rate.
+ * SwitchQueue represents message queue attached to a switch. This consists
+ * of queue itself and variables used for limiting sending rate.
*/
private static class SwitchQueue {
List<Queue<SwitchQueueEntry>> rawQueues;
QueueState state;
// Max rate of sending message (bytes/ms). 0 implies no limitation.
- long maxRate = 0; // 0 indicates no limitation
+ long maxRate = 0; // 0 indicates no limitation
long lastSentTime = 0;
long lastSentSize = 0;
@@ -137,7 +105,7 @@
/**
* Check if sending rate is within the rate.
- *
+ * <p>
* @param current Current time
* @return true if within the rate
*/
@@ -158,9 +126,9 @@
/**
* Log time and size of last sent data.
- *
+ * <p>
* @param current Time to be sent.
- * @param size Size of sent data (in bytes).
+ * @param size Size of sent data (in bytes).
*/
void logSentData(long current, long size) {
lastSentTime = current;
@@ -178,52 +146,52 @@
/**
* Poll single appropriate entry object according to QueueState.
- *
+ * <p>
* @return Entry object.
*/
SwitchQueueEntry poll() {
switch (state) {
- case READY: {
- for (int i = 0; i < rawQueues.size(); ++i) {
- SwitchQueueEntry entry = rawQueues.get(i).poll();
- if (entry != null) {
- return entry;
- }
+ case READY: {
+ for (int i = 0; i < rawQueues.size(); ++i) {
+ SwitchQueueEntry entry = rawQueues.get(i).poll();
+ if (entry != null) {
+ return entry;
}
+ }
- return null;
- }
- case SUSPENDED: {
- // Only polling from high priority queue
- SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
- return entry;
- }
- default:
- log.error("Unexpected QueueState: {}", state);
- return null;
+ return null;
+ }
+ case SUSPENDED: {
+ // Only polling from high priority queue
+ SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
+ return entry;
+ }
+ default:
+ log.error("Unexpected QueueState: {}", state);
+ return null;
}
}
/**
* Check if this object has any messages in the queues to be sent.
- *
+ * <p>
* @return True if there are some messages to be sent.
*/
boolean hasMessageToSend() {
switch (state) {
- case READY:
- for (Queue<SwitchQueueEntry> queue : rawQueues) {
- if (!queue.isEmpty()) {
- return true;
- }
+ case READY:
+ for (Queue<SwitchQueueEntry> queue : rawQueues) {
+ if (!queue.isEmpty()) {
+ return true;
}
- break;
- case SUSPENDED:
- // Only checking high priority queue
- return (!getQueue(MsgPriority.HIGH).isEmpty());
- default:
- log.error("Unexpected QueueState: {}", state);
- return false;
+ }
+ break;
+ case SUSPENDED:
+ // Only checking high priority queue
+ return (!getQueue(MsgPriority.HIGH).isEmpty());
+ default:
+ log.error("Unexpected QueueState: {}", state);
+ return false;
}
return false;
@@ -239,7 +207,7 @@
*/
private static final class BarrierInfo {
final long dpid;
- final int xid;
+ final long xid;
static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
return new BarrierInfo(sw.getId(), req.getXid());
@@ -249,7 +217,7 @@
return new BarrierInfo(sw.getId(), rpy.getXid());
}
- private BarrierInfo(long dpid, int xid) {
+ private BarrierInfo(long dpid, long xid) {
this.dpid = dpid;
this.xid = xid;
}
@@ -260,7 +228,7 @@
final int prime = 31;
int result = 1;
result = prime * result + (int) (dpid ^ (dpid >>> 32));
- result = prime * result + xid;
+ result = prime * result + (int) (xid ^ (xid >>> 32));
return result;
}
@@ -280,32 +248,30 @@
return (this.dpid == other.dpid) && (this.xid == other.xid);
}
-
}
+ private FloodlightModuleContext context = null;
private OFMessageDamper messageDamper = null;
private IThreadPoolService threadPool = null;
-
- private FloodlightContext context = null;
- private BasicFactory factory = null;
+ private IFloodlightProviderService floodlightProvider = null;
+ protected Map<OFVersion, OFFactory> ofFactoryMap = null;
// Map of threads versus dpid
private Map<Long, FlowPusherThread> threadMap = null;
// Map from (DPID and transaction ID) to Future objects.
- private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
- = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
+ private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
+ new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
private int numberThread;
/**
* Main thread that reads messages from queues and sends them to switches.
*/
- private class FlowPusherThread extends Thread {
- // Weak ConncurrentHashMap
- private Map<IOFSwitch, SwitchQueue> assignedQueues
- = CacheBuilder.newBuilder()
- .weakKeys()
- .<IOFSwitch, SwitchQueue>build().asMap();
+ private static class FlowPusherThread extends Thread {
+ // Weak ConcurrentHashMap
+ private Map<IOFSwitch, SwitchQueue> assignedQueues = CacheBuilder.newBuilder()
+ .weakKeys()
+ .<IOFSwitch, SwitchQueue>build().asMap();
final Lock queuingLock = new ReentrantLock();
final Condition messagePushed = queuingLock.newCondition();
@@ -329,9 +295,8 @@
}
}
- for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues.entrySet().iterator();
- it.hasNext();
- ) {
+ for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues
+ .entrySet().iterator(); it.hasNext();) {
Entry<IOFSwitch, SwitchQueue> entry = it.next();
IOFSwitch sw = entry.getKey();
SwitchQueue queue = entry.getValue();
@@ -352,13 +317,13 @@
}
/**
- * Read messages from queue and send them to the switch.
- * If number of messages excess the limit, stop sending messages.
- *
- * @param sw Switch to which messages will be sent.
- * @param queue Queue of messages.
- * @param maxMsg Limitation of number of messages to be sent. If set to 0,
- * all messages in queue will be sent.
+ * Read messages from queue and send them to the switch. If number of
+ * messages excess the limit, stop sending messages.
+ * <p>
+ * @param sw Switch to which messages will be sent.
+ * @param queue Queue of messages.
+ * @param maxMsg Limitation of number of messages to be sent. If set to
+ * 0, all messages in queue will be sent.
*/
private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
// check sending rate and determine it to be sent or not
@@ -381,11 +346,14 @@
OFMessage msg = queueEntry.getOFMessage();
try {
- messageDamper.write(sw, msg, context);
+ // TODO BOC do we need to use the message damper?
+ // messageDamper.write(sw, msg, context);
+ sw.write(msg, null);
if (log.isTraceEnabled()) {
- log.trace("Pusher sends message: {}", msg);
+ log.trace("Pusher sends message to switch {}: {}", sw.getStringId(), msg);
}
- size += msg.getLength();
+ // TODO BOC how do we get the size?
+ // size += msg.getLength();
} catch (IOException e) {
log.error("Exception in sending message (" + msg + "):", e);
}
@@ -425,7 +393,7 @@
/**
* Initialize object with threads of given number.
- *
+ * <p>
* @param numberThreadValue Number of threads to handle messages.
*/
public FlowPusher(int numberThreadValue) {
@@ -438,44 +406,42 @@
/**
* Set parameters needed for sending messages.
- *
- * @param floodlightContext FloodlightContext used for sending messages.
- * If null, FlowPusher uses default context.
- * @param modContext FloodlightModuleContext used for acquiring
- * ThreadPoolService and registering MessageListener.
- * @param basicFactory Factory object to create OFMessage objects.
- * @param damper Message damper used for sending messages.
- * If null, FlowPusher creates its own damper object.
+ * <p>
+ * @param floodlightContext FloodlightModuleContext used for acquiring
+ * ThreadPoolService and registering MessageListener.
*/
- public void init(FloodlightContext floodlightContext,
- FloodlightModuleContext modContext,
- BasicFactory basicFactory,
- OFMessageDamper damper) {
- context = floodlightContext;
- factory = basicFactory;
- this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
- IFloodlightProviderService flservice
- = modContext.getServiceImpl(IFloodlightProviderService.class);
- flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
+ public void init(FloodlightModuleContext floodlightContext) {
+ this.context = floodlightContext;
+ this.floodlightProvider = context
+ .getServiceImpl(IFloodlightProviderService.class);
+ this.threadPool = context.getServiceImpl(IThreadPoolService.class);
+ this.messageDamper = null;
- if (damper != null) {
- messageDamper = damper;
- } else {
- // use default values
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
- EnumSet.of(OFType.FLOW_MOD),
- OFMESSAGE_DAMPER_TIMEOUT);
- }
+ ofFactoryMap = new HashMap<>();
+ ofFactoryMap.put(OFVersion.OF_10, floodlightProvider.getOFMessageFactory_10());
+ ofFactoryMap.put(OFVersion.OF_13, floodlightProvider.getOFMessageFactory_13());
+ floodlightProvider.addOFMessageListener(OFType.BARRIER_REPLY, this);
+
+ // TODO BOC message damper may not be needed...
+ // if (damper != null) {
+ // messageDamper = damper;
+ // } else {
+ // use default values
+ /*messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);*/
+ // }
}
/**
* Begin processing queue.
*/
public void start() {
- if (factory == null) {
- log.error("FlowPusher not yet initialized.");
- return;
- }
+ // TODO BOC
+ // if (factory == null) {
+ // log.error("FlowPusher not yet initialized.");
+ // return;
+ // }
threadMap = new HashMap<Long, FlowPusherThread>();
for (long i = 0; i < numberThread; ++i) {
@@ -491,7 +457,8 @@
SwitchQueue queue = getQueue(sw);
if (queue == null) {
- // create queue in case suspend is called before first message addition
+ // create queue in case suspend is called before first message
+ // addition
queue = createQueueImpl(sw);
}
@@ -509,7 +476,7 @@
SwitchQueue queue = getQueue(sw);
if (queue == null) {
- log.error("No queue is attached to DPID: {}", sw.getId());
+ log.error("No queue is attached to DPID: {}", sw.getStringId());
return false;
}
@@ -560,7 +527,7 @@
}
if (rate > 0) {
- log.debug("rate for {} is set to {}", sw.getId(), rate);
+ log.debug("rate for {} is set to {}", sw.getStringId(), rate);
synchronized (queue) {
queue.maxRate = rate;
}
@@ -569,7 +536,7 @@
@Override
@SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
- justification = "Future versions of createQueueImpl() might return null")
+ justification = "Future versions of createQueueImpl() might return null")
public boolean createQueue(IOFSwitch sw) {
SwitchQueue queue = createQueueImpl(sw);
@@ -619,13 +586,12 @@
/**
* Invalidate.
- *
+ * <p>
* @param sw switch
- *
* @see OFMessageDamper#invalidate(IOFSwitch)
*/
public void invalidate(IOFSwitch sw) {
- messageDamper.invalidate(sw);
+ // messageDamper.invalidate(sw); currently a null ptr - commenting out
}
@Override
@@ -668,226 +634,9 @@
}
/**
- * Fetch the match conditions.
- * NOTE: The Flow matching conditions common for all Flow Entries are
- * used ONLY if a Flow Entry does NOT have the corresponding matching
- * condition set.
- *
- * @param flowEntryMatch Flow entry to create a matcher for
- * @return open flow matcher for the given values
- */
- private OFMatch computeMatch(FlowEntryMatch flowEntryMatch) {
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
-
- // Match the Incoming Port
- PortNumber matchInPort = flowEntryMatch.inPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort.shortValue());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- MACAddress matchSrcMac = flowEntryMatch.srcMac();
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- MACAddress matchDstMac = flowEntryMatch.dstMac();
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryMatch.vlanId();
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryMatch.vlanPriority();
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards()
- & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
- }
-
- // Natch the Destination IPv4 Network prefix
- IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryMatch.ipProto();
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryMatch.ipToS();
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- return match;
- }
-
-
- /**
- * Wrapper object to hold a port number. Used to pass around output ports.
- */
- private static class OutputPort {
- private Short portNumber;
- }
-
- /**
- * Process a flow action entry, putting the resulting flow
- * actions into a list. Will also set the actionOutputPort
- * if one is encountered while processing an action.
- *
- * @param action Flow Entry Action to process
- * @param openFlowActions actions to perform get added to this list
- * @param actionOutputPort this will get set if an action output
- * port is found
- */
- private void processAction(final FlowEntryAction action,
- final List<OFAction> openFlowActions,
- final OutputPort actionOutputPort) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action
- .actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action
- .actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action
- .actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action
- .actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action
- .actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
- .actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
- .actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort.portNumber = actionOutput.port().shortValue();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa = new OFActionOutput(actionOutput.port()
- .shortValue(), (short) 0xffff);
- openFlowActions.add(ofa);
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan()) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().shortValue(), actionEnqueue.queueId());
- openFlowActions.add(ofa);
- }
- }
-
-
- /**
* Create a message from FlowEntry and add it to the queue of the switch.
- *
- * @param sw Switch to which message is pushed.
+ * <p>
+ * @param sw Switch to which message is pushed.
* @param flowEntry FlowEntry object used for creating message.
* @return true if message is successfully added to a queue.
*/
@@ -895,90 +644,14 @@
//
// Create the OpenFlow Flow Modification Entry to push
//
- OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
- long cookie = flowEntry.flowEntryId().value();
-
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug(
- "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntry.flowEntryId(),
- flowEntry.flowEntryUserState());
- return false;
- }
-
- final FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
- final OFMatch match = computeMatch(flowEntryMatch);
-
- final PortNumber matchInPort = flowEntryMatch.inPort();
- final MACAddress matchSrcMac = flowEntryMatch.srcMac();
- final MACAddress matchDstMac = flowEntryMatch.dstMac();
-
- //
- // Fetch the actions
- //
- final List<OFAction> openFlowActions = new ArrayList<OFAction>();
- final FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- final OutputPort actionOutputPort = new OutputPort();
- for (FlowEntryAction action : flowEntryActions.actions()) {
- processAction(action, openFlowActions, actionOutputPort);
- }
- int actionsLen = 0;
- for (OFAction ofa : openFlowActions) {
- actionsLen += ofa.getLength();
- }
-
- fm.setIdleTimeout((short) flowEntry.idleTimeout())
- .setHardTimeout((short) flowEntry.hardTimeout())
- .setPriority((short) flowEntry.priority())
- .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
- .setCommand(flowModCommand).setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
- || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort.portNumber != null) {
- fm.setOutPort(actionOutputPort.portNumber);
- }
- }
-
- //
- // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
- // permanent.
- //
- if ((flowEntry.idleTimeout() != 0) ||
- (flowEntry.hardTimeout() != 0)) {
- fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- }
-
- if (log.isTraceEnabled()) {
- log.trace("Installing flow entry {} into switch DPID: {} " +
- "flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
- , flowEntry.flowEntryUserState()
- , sw.getStringId()
- , flowEntry.flowEntryId()
- , matchSrcMac
- , matchDstMac
- , matchInPort
- , actionOutputPort
- );
- }
-
+ OFFlowMod fm = flowEntry.buildFlowMod(ofFactoryMap.get(sw.getOFVersion()));
+ // log.trace("Pushing flow mod {}", fm);
return addMessageImpl(sw, fm, priority);
}
/**
* Add message to queue.
- *
+ * <p>
* @param sw
* @param msg
* @param priority
@@ -999,7 +672,7 @@
synchronized (queue) {
queue.add(entry, priority);
if (log.isTraceEnabled()) {
- log.trace("Message is pushed: {}", entry.getOFMessage());
+ log.trace("Message is pushed to switch {}: {}", sw.getStringId(), entry.getOFMessage());
}
}
@@ -1014,20 +687,18 @@
if (future == null) {
return null;
}
-
try {
return future.get();
} catch (InterruptedException e) {
log.error("InterruptedException", e);
- return null;
} catch (ExecutionException e) {
log.error("ExecutionException", e);
- return null;
}
+ return null;
}
@Override
- public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+ public OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw) {
// TODO creation of message and future should be moved to OFSwitchImpl
if (sw == null) {
@@ -1035,25 +706,28 @@
}
OFBarrierRequest msg = createBarrierRequest(sw);
-
- OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
+ (int) msg.getXid());
barrierFutures.put(BarrierInfo.create(sw, msg), future);
-
addMessageImpl(sw, msg, MsgPriority.NORMAL);
-
return future;
}
protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
- OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
- msg.setXid(sw.getNextTransactionId());
-
- return msg;
+ OFFactory factory = ofFactoryMap.get(sw.getOFVersion());
+ if (factory == null) {
+ log.error("No OF Message Factory for switch {} with OFVersion {}", sw,
+ sw.getOFVersion());
+ return null;
+ }
+ return factory.buildBarrierRequest()
+ .setXid(sw.getNextTransactionId())
+ .build();
}
/**
* Get a queue attached to a switch.
- *
+ * <p>
* @param sw Switch object
* @return Queue object
*/
@@ -1072,7 +746,7 @@
/**
* Get a hash value correspondent to a switch.
- *
+ * <p>
* @param sw Switch object
* @return Hash value
*/
@@ -1084,7 +758,7 @@
/**
* Get a Thread object which processes the queue attached to a switch.
- *
+ * <p>
* @param sw Switch object
* @return Thread object
*/
@@ -1112,18 +786,17 @@
@Override
public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
if (log.isTraceEnabled()) {
- log.trace("Received BARRIER_REPLY from: {}", sw.getId());
+ log.trace("Received BARRIER_REPLY from : {}", sw.getStringId());
}
if ((msg.getType() != OFType.BARRIER_REPLY) ||
- !(msg instanceof OFBarrierReply)) {
+ !(msg instanceof OFBarrierReply)) {
log.error("Unexpected reply message: {}", msg.getType());
return Command.CONTINUE;
}
OFBarrierReply reply = (OFBarrierReply) msg;
BarrierInfo info = BarrierInfo.create(sw, reply);
-
// Deliver future if exists
OFBarrierReplyFuture future = barrierFutures.get(info);
if (future != null) {
@@ -1133,4 +806,5 @@
return Command.CONTINUE;
}
+
}