Giant patch of changes to support OpenFlow 1.3
The following people have contributed to this patch:
- Ali Al-Shabibi <alshabibi.ali@gmail.com>
- Ayaka Koshibe <ayaka@onlab.us>
- Brian O'Connor <bocon@onlab.us>
- Jonathan Hart <jono@onlab.us>
- Matteo Gerola <mgerola@create-net.org>
- Michele Santuari <michele.santuari@create-net.org>
- Pavlin Radoslavov <pavlin@onlab.us>
- Saurav Das <sauravdas@alumni.stanford.edu>
- Toshio Koide <t-koide@onlab.us>
- Yuta HIGUCHI <y-higuchi@onlab.us>
The patch includes the following changes:
- New Floodlight I/O loop / state machine
- New switch/port handling
- New role management (incl. Role.EQUAL)
- Added Floodlight debug framework
- Updates to Controller.java
- Move to Loxigen's OpenflowJ library
- Added OF1.3 support
- Added support for different switches (via DriverManager)
- Updated ONOS modules to use new APIs
- Added and updated unit tests
Change-Id: Ic70a8d50f7136946193d2ba2e4dc0b4bfac5f599
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java
index 046d130..4a62947 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowProgrammer.java
@@ -7,6 +7,7 @@
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.IOFSwitch.PortChangeType;
import net.floodlightcontroller.core.IOFSwitchListener;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -16,19 +17,20 @@
import net.onrc.onos.core.flowprogrammer.web.FlowProgrammerWebRoutable;
import net.onrc.onos.core.registry.IControllerRegistryService;
+import org.projectfloodlight.openflow.protocol.OFPortDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * FlowProgrammer is a module responsible to maintain flows installed to switches.
- * FlowProgrammer consists of FlowPusher and FlowSynchronizer.
- * FlowPusher manages the rate of installation, and FlowSynchronizer synchronizes
- * flows between GraphDB and switches.
- * FlowProgrammer also watch the event of addition/deletion of switches to
- * start/stop synchronization. When a switch is added to network, FlowProgrammer
- * immediately kicks synchronization to keep switch's flow table latest state.
- * Adversely, when a switch is removed from network, FlowProgrammer immediately
- * stops synchronization.
+ * FlowProgrammer is a module responsible to maintain flows installed to
+ * switches. FlowProgrammer consists of FlowPusher and FlowSynchronizer.
+ * FlowPusher manages the rate of installation, and FlowSynchronizer
+ * synchronizes flows between GraphDB and switches. FlowProgrammer also watch
+ * the event of addition/deletion of switches to start/stop synchronization.
+ * When a switch is added to network, FlowProgrammer immediately kicks
+ * synchronization to keep switch's flow table latest state. Adversely, when a
+ * switch is removed from network, FlowProgrammer immediately stops
+ * synchronization.
*/
public class FlowProgrammer implements IFloodlightModule,
IOFSwitchListener {
@@ -57,7 +59,7 @@
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
registryService = context.getServiceImpl(IControllerRegistryService.class);
restApi = context.getServiceImpl(IRestApiService.class);
- pusher.init(null, context, floodlightProvider.getOFMessageFactory(), null);
+ pusher.init(context);
if (ENABLE_FLOW_SYNC) {
synchronizer.init(pusher);
}
@@ -83,8 +85,7 @@
@Override
public Map<Class<? extends IFloodlightService>, IFloodlightService> getServiceImpls() {
- Map<Class<? extends IFloodlightService>,
- IFloodlightService> m =
+ Map<Class<? extends IFloodlightService>, IFloodlightService> m =
new HashMap<Class<? extends IFloodlightService>,
IFloodlightService>();
m.put(IFlowPusherService.class, pusher);
@@ -103,25 +104,56 @@
return l;
}
+ // ***********************
+ // IOFSwitchListener
+ // ***********************
+
@Override
public String getName() {
- // TODO Auto-generated method stub
return "FlowProgrammer";
}
@Override
- public void addedSwitch(IOFSwitch sw) {
- log.debug("Switch added: {}", sw.getId());
+ public void switchActivatedMaster(long swId) {
+ IOFSwitch sw = floodlightProvider.getSwitch(swId);
+ if (sw == null) {
+ log.warn("Added switch not available {} ", swId);
+ return;
+ }
+ log.debug("Switch added: {}", swId);
if (ENABLE_FLOW_SYNC) {
- if (registryService.hasControl(sw.getId())) {
+ if (registryService.hasControl(swId)) {
synchronizer.synchronize(sw);
}
}
}
@Override
- public void removedSwitch(IOFSwitch sw) {
+ public void switchActivatedEqual(long swId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void switchMasterToEqual(long swId) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void switchEqualToMaster(long swId) {
+ // for now treat as switchActivatedMaster
+ switchActivatedMaster(swId);
+ }
+
+ @Override
+ public void switchDisconnected(long swId) {
+ IOFSwitch sw = floodlightProvider.getSwitch(swId);
+ if (sw == null) {
+ log.warn("Removed switch not available {} ", swId);
+ return;
+ }
log.debug("Switch removed: {}", sw.getId());
if (ENABLE_FLOW_SYNC) {
@@ -132,7 +164,7 @@
}
@Override
- public void switchPortChanged(Long switchId) {
+ public void switchPortChanged(long swId, OFPortDesc port, PortChangeType pct) {
// TODO Auto-generated method stub
}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
index 37e1b44..17bbc1a 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
@@ -4,7 +4,6 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -25,48 +24,17 @@
import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.threadpool.IThreadPoolService;
-import net.floodlightcontroller.util.MACAddress;
import net.floodlightcontroller.util.OFMessageDamper;
-import net.onrc.onos.core.util.FlowEntry;
-import net.onrc.onos.core.util.FlowEntryAction;
-import net.onrc.onos.core.util.FlowEntryAction.ActionEnqueue;
-import net.onrc.onos.core.util.FlowEntryAction.ActionOutput;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetEthernetAddr;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetIPv4Addr;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetIpToS;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetTcpUdpPort;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanId;
-import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanPriority;
-import net.onrc.onos.core.util.FlowEntryAction.ActionStripVlan;
-import net.onrc.onos.core.util.FlowEntryActions;
-import net.onrc.onos.core.util.FlowEntryMatch;
-import net.onrc.onos.core.util.FlowEntryUserState;
-import net.onrc.onos.core.util.IPv4Net;
+import net.onrc.onos.core.intent.FlowEntry;
import net.onrc.onos.core.util.Pair;
-import net.onrc.onos.core.util.PortNumber;
-import org.openflow.protocol.OFBarrierReply;
-import org.openflow.protocol.OFBarrierRequest;
-import org.openflow.protocol.OFFlowMod;
-import org.openflow.protocol.OFMatch;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFPacketOut;
-import org.openflow.protocol.OFPort;
-import org.openflow.protocol.OFType;
-import org.openflow.protocol.action.OFAction;
-import org.openflow.protocol.action.OFActionDataLayerDestination;
-import org.openflow.protocol.action.OFActionDataLayerSource;
-import org.openflow.protocol.action.OFActionEnqueue;
-import org.openflow.protocol.action.OFActionNetworkLayerDestination;
-import org.openflow.protocol.action.OFActionNetworkLayerSource;
-import org.openflow.protocol.action.OFActionNetworkTypeOfService;
-import org.openflow.protocol.action.OFActionOutput;
-import org.openflow.protocol.action.OFActionStripVirtualLan;
-import org.openflow.protocol.action.OFActionTransportLayerDestination;
-import org.openflow.protocol.action.OFActionTransportLayerSource;
-import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
-import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
-import org.openflow.protocol.factory.BasicFactory;
+import org.projectfloodlight.openflow.protocol.OFBarrierReply;
+import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
+import org.projectfloodlight.openflow.protocol.OFFactory;
+import org.projectfloodlight.openflow.protocol.OFFlowMod;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFType;
+import org.projectfloodlight.openflow.protocol.OFVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,14 +43,13 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
/**
- * FlowPusher is a implementation of FlowPusherService.
- * FlowPusher assigns one message queue instance for each one switch.
- * Number of message processing threads is configurable by constructor, and
- * one thread can handle multiple message queues. Each queue will be assigned to
- * a thread according to hash function defined by getHash().
- * Each processing thread reads messages from queues and sends it to switches
- * in round-robin. Processing thread also calculates rate of sending to suppress
- * excessive message sending.
+ * FlowPusher is a implementation of FlowPusherService. FlowPusher assigns one
+ * message queue instance for each one switch. Number of message processing
+ * threads is configurable by constructor, and one thread can handle multiple
+ * message queues. Each queue will be assigned to a thread according to hash
+ * function defined by getHash(). Each processing thread reads messages from
+ * queues and sends it to switches in round-robin. Processing thread also
+ * calculates rate of sending to suppress excessive message sending.
*/
public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
@@ -91,8 +58,9 @@
// TODO: Values copied from elsewhere (class LearningSwitch).
// The local copy should go away!
//
- protected static final int OFMESSAGE_DAMPER_CAPACITY = 10000; // TODO: find sweet spot
- protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
+ protected static final int OFMESSAGE_DAMPER_CAPACITY = 10000; // TODO: find
+ // sweet spot
+ protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
// Number of messages sent to switch at once
protected static final int MAX_MESSAGE_SEND = 100;
@@ -110,15 +78,15 @@
}
/**
- * SwitchQueue represents message queue attached to a switch.
- * This consists of queue itself and variables used for limiting sending rate.
+ * SwitchQueue represents message queue attached to a switch. This consists
+ * of queue itself and variables used for limiting sending rate.
*/
private static class SwitchQueue {
List<Queue<SwitchQueueEntry>> rawQueues;
QueueState state;
// Max rate of sending message (bytes/ms). 0 implies no limitation.
- long maxRate = 0; // 0 indicates no limitation
+ long maxRate = 0; // 0 indicates no limitation
long lastSentTime = 0;
long lastSentSize = 0;
@@ -137,7 +105,7 @@
/**
* Check if sending rate is within the rate.
- *
+ * <p>
* @param current Current time
* @return true if within the rate
*/
@@ -158,9 +126,9 @@
/**
* Log time and size of last sent data.
- *
+ * <p>
* @param current Time to be sent.
- * @param size Size of sent data (in bytes).
+ * @param size Size of sent data (in bytes).
*/
void logSentData(long current, long size) {
lastSentTime = current;
@@ -178,52 +146,52 @@
/**
* Poll single appropriate entry object according to QueueState.
- *
+ * <p>
* @return Entry object.
*/
SwitchQueueEntry poll() {
switch (state) {
- case READY: {
- for (int i = 0; i < rawQueues.size(); ++i) {
- SwitchQueueEntry entry = rawQueues.get(i).poll();
- if (entry != null) {
- return entry;
- }
+ case READY: {
+ for (int i = 0; i < rawQueues.size(); ++i) {
+ SwitchQueueEntry entry = rawQueues.get(i).poll();
+ if (entry != null) {
+ return entry;
}
+ }
- return null;
- }
- case SUSPENDED: {
- // Only polling from high priority queue
- SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
- return entry;
- }
- default:
- log.error("Unexpected QueueState: {}", state);
- return null;
+ return null;
+ }
+ case SUSPENDED: {
+ // Only polling from high priority queue
+ SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
+ return entry;
+ }
+ default:
+ log.error("Unexpected QueueState: {}", state);
+ return null;
}
}
/**
* Check if this object has any messages in the queues to be sent.
- *
+ * <p>
* @return True if there are some messages to be sent.
*/
boolean hasMessageToSend() {
switch (state) {
- case READY:
- for (Queue<SwitchQueueEntry> queue : rawQueues) {
- if (!queue.isEmpty()) {
- return true;
- }
+ case READY:
+ for (Queue<SwitchQueueEntry> queue : rawQueues) {
+ if (!queue.isEmpty()) {
+ return true;
}
- break;
- case SUSPENDED:
- // Only checking high priority queue
- return (!getQueue(MsgPriority.HIGH).isEmpty());
- default:
- log.error("Unexpected QueueState: {}", state);
- return false;
+ }
+ break;
+ case SUSPENDED:
+ // Only checking high priority queue
+ return (!getQueue(MsgPriority.HIGH).isEmpty());
+ default:
+ log.error("Unexpected QueueState: {}", state);
+ return false;
}
return false;
@@ -239,7 +207,7 @@
*/
private static final class BarrierInfo {
final long dpid;
- final int xid;
+ final long xid;
static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
return new BarrierInfo(sw.getId(), req.getXid());
@@ -249,7 +217,7 @@
return new BarrierInfo(sw.getId(), rpy.getXid());
}
- private BarrierInfo(long dpid, int xid) {
+ private BarrierInfo(long dpid, long xid) {
this.dpid = dpid;
this.xid = xid;
}
@@ -260,7 +228,7 @@
final int prime = 31;
int result = 1;
result = prime * result + (int) (dpid ^ (dpid >>> 32));
- result = prime * result + xid;
+ result = prime * result + (int) (xid ^ (xid >>> 32));
return result;
}
@@ -280,32 +248,30 @@
return (this.dpid == other.dpid) && (this.xid == other.xid);
}
-
}
+ private FloodlightModuleContext context = null;
private OFMessageDamper messageDamper = null;
private IThreadPoolService threadPool = null;
-
- private FloodlightContext context = null;
- private BasicFactory factory = null;
+ private IFloodlightProviderService floodlightProvider = null;
+ protected Map<OFVersion, OFFactory> ofFactoryMap = null;
// Map of threads versus dpid
private Map<Long, FlowPusherThread> threadMap = null;
// Map from (DPID and transaction ID) to Future objects.
- private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
- = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
+ private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
+ new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
private int numberThread;
/**
* Main thread that reads messages from queues and sends them to switches.
*/
- private class FlowPusherThread extends Thread {
- // Weak ConncurrentHashMap
- private Map<IOFSwitch, SwitchQueue> assignedQueues
- = CacheBuilder.newBuilder()
- .weakKeys()
- .<IOFSwitch, SwitchQueue>build().asMap();
+ private static class FlowPusherThread extends Thread {
+ // Weak ConcurrentHashMap
+ private Map<IOFSwitch, SwitchQueue> assignedQueues = CacheBuilder.newBuilder()
+ .weakKeys()
+ .<IOFSwitch, SwitchQueue>build().asMap();
final Lock queuingLock = new ReentrantLock();
final Condition messagePushed = queuingLock.newCondition();
@@ -329,9 +295,8 @@
}
}
- for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues.entrySet().iterator();
- it.hasNext();
- ) {
+ for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues
+ .entrySet().iterator(); it.hasNext();) {
Entry<IOFSwitch, SwitchQueue> entry = it.next();
IOFSwitch sw = entry.getKey();
SwitchQueue queue = entry.getValue();
@@ -352,13 +317,13 @@
}
/**
- * Read messages from queue and send them to the switch.
- * If number of messages excess the limit, stop sending messages.
- *
- * @param sw Switch to which messages will be sent.
- * @param queue Queue of messages.
- * @param maxMsg Limitation of number of messages to be sent. If set to 0,
- * all messages in queue will be sent.
+ * Read messages from queue and send them to the switch. If number of
+ * messages excess the limit, stop sending messages.
+ * <p>
+ * @param sw Switch to which messages will be sent.
+ * @param queue Queue of messages.
+ * @param maxMsg Limitation of number of messages to be sent. If set to
+ * 0, all messages in queue will be sent.
*/
private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
// check sending rate and determine it to be sent or not
@@ -381,11 +346,14 @@
OFMessage msg = queueEntry.getOFMessage();
try {
- messageDamper.write(sw, msg, context);
+ // TODO BOC do we need to use the message damper?
+ // messageDamper.write(sw, msg, context);
+ sw.write(msg, null);
if (log.isTraceEnabled()) {
- log.trace("Pusher sends message: {}", msg);
+ log.trace("Pusher sends message to switch {}: {}", sw.getStringId(), msg);
}
- size += msg.getLength();
+ // TODO BOC how do we get the size?
+ // size += msg.getLength();
} catch (IOException e) {
log.error("Exception in sending message (" + msg + "):", e);
}
@@ -425,7 +393,7 @@
/**
* Initialize object with threads of given number.
- *
+ * <p>
* @param numberThreadValue Number of threads to handle messages.
*/
public FlowPusher(int numberThreadValue) {
@@ -438,44 +406,42 @@
/**
* Set parameters needed for sending messages.
- *
- * @param floodlightContext FloodlightContext used for sending messages.
- * If null, FlowPusher uses default context.
- * @param modContext FloodlightModuleContext used for acquiring
- * ThreadPoolService and registering MessageListener.
- * @param basicFactory Factory object to create OFMessage objects.
- * @param damper Message damper used for sending messages.
- * If null, FlowPusher creates its own damper object.
+ * <p>
+ * @param floodlightContext FloodlightModuleContext used for acquiring
+ * ThreadPoolService and registering MessageListener.
*/
- public void init(FloodlightContext floodlightContext,
- FloodlightModuleContext modContext,
- BasicFactory basicFactory,
- OFMessageDamper damper) {
- context = floodlightContext;
- factory = basicFactory;
- this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
- IFloodlightProviderService flservice
- = modContext.getServiceImpl(IFloodlightProviderService.class);
- flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
+ public void init(FloodlightModuleContext floodlightContext) {
+ this.context = floodlightContext;
+ this.floodlightProvider = context
+ .getServiceImpl(IFloodlightProviderService.class);
+ this.threadPool = context.getServiceImpl(IThreadPoolService.class);
+ this.messageDamper = null;
- if (damper != null) {
- messageDamper = damper;
- } else {
- // use default values
- messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
- EnumSet.of(OFType.FLOW_MOD),
- OFMESSAGE_DAMPER_TIMEOUT);
- }
+ ofFactoryMap = new HashMap<>();
+ ofFactoryMap.put(OFVersion.OF_10, floodlightProvider.getOFMessageFactory_10());
+ ofFactoryMap.put(OFVersion.OF_13, floodlightProvider.getOFMessageFactory_13());
+ floodlightProvider.addOFMessageListener(OFType.BARRIER_REPLY, this);
+
+ // TODO BOC message damper may not be needed...
+ // if (damper != null) {
+ // messageDamper = damper;
+ // } else {
+ // use default values
+ /*messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);*/
+ // }
}
/**
* Begin processing queue.
*/
public void start() {
- if (factory == null) {
- log.error("FlowPusher not yet initialized.");
- return;
- }
+ // TODO BOC
+ // if (factory == null) {
+ // log.error("FlowPusher not yet initialized.");
+ // return;
+ // }
threadMap = new HashMap<Long, FlowPusherThread>();
for (long i = 0; i < numberThread; ++i) {
@@ -491,7 +457,8 @@
SwitchQueue queue = getQueue(sw);
if (queue == null) {
- // create queue in case suspend is called before first message addition
+ // create queue in case suspend is called before first message
+ // addition
queue = createQueueImpl(sw);
}
@@ -509,7 +476,7 @@
SwitchQueue queue = getQueue(sw);
if (queue == null) {
- log.error("No queue is attached to DPID: {}", sw.getId());
+ log.error("No queue is attached to DPID: {}", sw.getStringId());
return false;
}
@@ -560,7 +527,7 @@
}
if (rate > 0) {
- log.debug("rate for {} is set to {}", sw.getId(), rate);
+ log.debug("rate for {} is set to {}", sw.getStringId(), rate);
synchronized (queue) {
queue.maxRate = rate;
}
@@ -569,7 +536,7 @@
@Override
@SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
- justification = "Future versions of createQueueImpl() might return null")
+ justification = "Future versions of createQueueImpl() might return null")
public boolean createQueue(IOFSwitch sw) {
SwitchQueue queue = createQueueImpl(sw);
@@ -619,13 +586,12 @@
/**
* Invalidate.
- *
+ * <p>
* @param sw switch
- *
* @see OFMessageDamper#invalidate(IOFSwitch)
*/
public void invalidate(IOFSwitch sw) {
- messageDamper.invalidate(sw);
+ // messageDamper.invalidate(sw); currently a null ptr - commenting out
}
@Override
@@ -668,226 +634,9 @@
}
/**
- * Fetch the match conditions.
- * NOTE: The Flow matching conditions common for all Flow Entries are
- * used ONLY if a Flow Entry does NOT have the corresponding matching
- * condition set.
- *
- * @param flowEntryMatch Flow entry to create a matcher for
- * @return open flow matcher for the given values
- */
- private OFMatch computeMatch(FlowEntryMatch flowEntryMatch) {
- OFMatch match = new OFMatch();
- match.setWildcards(OFMatch.OFPFW_ALL);
-
- // Match the Incoming Port
- PortNumber matchInPort = flowEntryMatch.inPort();
- if (matchInPort != null) {
- match.setInputPort(matchInPort.shortValue());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
- }
-
- // Match the Source MAC address
- MACAddress matchSrcMac = flowEntryMatch.srcMac();
- if (matchSrcMac != null) {
- match.setDataLayerSource(matchSrcMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
- }
-
- // Match the Destination MAC address
- MACAddress matchDstMac = flowEntryMatch.dstMac();
- if (matchDstMac != null) {
- match.setDataLayerDestination(matchDstMac.toString());
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
- }
-
- // Match the Ethernet Frame Type
- Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
- if (matchEthernetFrameType != null) {
- match.setDataLayerType(matchEthernetFrameType);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
- }
-
- // Match the VLAN ID
- Short matchVlanId = flowEntryMatch.vlanId();
- if (matchVlanId != null) {
- match.setDataLayerVirtualLan(matchVlanId);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
- }
-
- // Match the VLAN priority
- Byte matchVlanPriority = flowEntryMatch.vlanPriority();
- if (matchVlanPriority != null) {
- match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
- match.setWildcards(match.getWildcards()
- & ~OFMatch.OFPFW_DL_VLAN_PCP);
- }
-
- // Match the Source IPv4 Network prefix
- IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
- if (matchSrcIPv4Net != null) {
- match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
- }
-
- // Natch the Destination IPv4 Network prefix
- IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
- if (matchDstIPv4Net != null) {
- match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
- }
-
- // Match the IP protocol
- Byte matchIpProto = flowEntryMatch.ipProto();
- if (matchIpProto != null) {
- match.setNetworkProtocol(matchIpProto);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
- }
-
- // Match the IP ToS (DSCP field, 6 bits)
- Byte matchIpToS = flowEntryMatch.ipToS();
- if (matchIpToS != null) {
- match.setNetworkTypeOfService(matchIpToS);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
- }
-
- // Match the Source TCP/UDP port
- Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
- if (matchSrcTcpUdpPort != null) {
- match.setTransportSource(matchSrcTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
- }
-
- // Match the Destination TCP/UDP port
- Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
- if (matchDstTcpUdpPort != null) {
- match.setTransportDestination(matchDstTcpUdpPort);
- match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
- }
-
- return match;
- }
-
-
- /**
- * Wrapper object to hold a port number. Used to pass around output ports.
- */
- private static class OutputPort {
- private Short portNumber;
- }
-
- /**
- * Process a flow action entry, putting the resulting flow
- * actions into a list. Will also set the actionOutputPort
- * if one is encountered while processing an action.
- *
- * @param action Flow Entry Action to process
- * @param openFlowActions actions to perform get added to this list
- * @param actionOutputPort this will get set if an action output
- * port is found
- */
- private void processAction(final FlowEntryAction action,
- final List<OFAction> openFlowActions,
- final OutputPort actionOutputPort) {
- ActionOutput actionOutput = action.actionOutput();
- ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
- ActionSetVlanPriority actionSetVlanPriority = action
- .actionSetVlanPriority();
- ActionStripVlan actionStripVlan = action.actionStripVlan();
- ActionSetEthernetAddr actionSetEthernetSrcAddr = action
- .actionSetEthernetSrcAddr();
- ActionSetEthernetAddr actionSetEthernetDstAddr = action
- .actionSetEthernetDstAddr();
- ActionSetIPv4Addr actionSetIPv4SrcAddr = action
- .actionSetIPv4SrcAddr();
- ActionSetIPv4Addr actionSetIPv4DstAddr = action
- .actionSetIPv4DstAddr();
- ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
- ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
- .actionSetTcpUdpSrcPort();
- ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
- .actionSetTcpUdpDstPort();
- ActionEnqueue actionEnqueue = action.actionEnqueue();
-
- if (actionOutput != null) {
- actionOutputPort.portNumber = actionOutput.port().shortValue();
- // XXX: The max length is hard-coded for now
- OFActionOutput ofa = new OFActionOutput(actionOutput.port()
- .shortValue(), (short) 0xffff);
- openFlowActions.add(ofa);
- }
-
- if (actionSetVlanId != null) {
- OFActionVirtualLanIdentifier ofa =
- new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
- openFlowActions.add(ofa);
- }
-
- if (actionSetVlanPriority != null) {
- OFActionVirtualLanPriorityCodePoint ofa =
- new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
- openFlowActions.add(ofa);
- }
-
- if (actionStripVlan != null) {
- if (actionStripVlan.stripVlan()) {
- OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
- openFlowActions.add(ofa);
- }
- }
-
- if (actionSetEthernetSrcAddr != null) {
- OFActionDataLayerSource ofa =
- new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
- openFlowActions.add(ofa);
- }
-
- if (actionSetEthernetDstAddr != null) {
- OFActionDataLayerDestination ofa =
- new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
- openFlowActions.add(ofa);
- }
-
- if (actionSetIPv4SrcAddr != null) {
- OFActionNetworkLayerSource ofa =
- new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
- openFlowActions.add(ofa);
- }
-
- if (actionSetIPv4DstAddr != null) {
- OFActionNetworkLayerDestination ofa =
- new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
- openFlowActions.add(ofa);
- }
-
- if (actionSetIpToS != null) {
- OFActionNetworkTypeOfService ofa =
- new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
- openFlowActions.add(ofa);
- }
-
- if (actionSetTcpUdpSrcPort != null) {
- OFActionTransportLayerSource ofa =
- new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
- openFlowActions.add(ofa);
- }
-
- if (actionSetTcpUdpDstPort != null) {
- OFActionTransportLayerDestination ofa =
- new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
- openFlowActions.add(ofa);
- }
-
- if (actionEnqueue != null) {
- OFActionEnqueue ofa =
- new OFActionEnqueue(actionEnqueue.port().shortValue(), actionEnqueue.queueId());
- openFlowActions.add(ofa);
- }
- }
-
-
- /**
* Create a message from FlowEntry and add it to the queue of the switch.
- *
- * @param sw Switch to which message is pushed.
+ * <p>
+ * @param sw Switch to which message is pushed.
* @param flowEntry FlowEntry object used for creating message.
* @return true if message is successfully added to a queue.
*/
@@ -895,90 +644,14 @@
//
// Create the OpenFlow Flow Modification Entry to push
//
- OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
- long cookie = flowEntry.flowEntryId().value();
-
- short flowModCommand = OFFlowMod.OFPFC_ADD;
- if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
- flowModCommand = OFFlowMod.OFPFC_ADD;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
- flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
- } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
- flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
- } else {
- // Unknown user state. Ignore the entry
- log.debug(
- "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
- flowEntry.flowEntryId(),
- flowEntry.flowEntryUserState());
- return false;
- }
-
- final FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
- final OFMatch match = computeMatch(flowEntryMatch);
-
- final PortNumber matchInPort = flowEntryMatch.inPort();
- final MACAddress matchSrcMac = flowEntryMatch.srcMac();
- final MACAddress matchDstMac = flowEntryMatch.dstMac();
-
- //
- // Fetch the actions
- //
- final List<OFAction> openFlowActions = new ArrayList<OFAction>();
- final FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
- //
- final OutputPort actionOutputPort = new OutputPort();
- for (FlowEntryAction action : flowEntryActions.actions()) {
- processAction(action, openFlowActions, actionOutputPort);
- }
- int actionsLen = 0;
- for (OFAction ofa : openFlowActions) {
- actionsLen += ofa.getLength();
- }
-
- fm.setIdleTimeout((short) flowEntry.idleTimeout())
- .setHardTimeout((short) flowEntry.hardTimeout())
- .setPriority((short) flowEntry.priority())
- .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
- .setCommand(flowModCommand).setMatch(match)
- .setActions(openFlowActions)
- .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
- fm.setOutPort(OFPort.OFPP_NONE.getValue());
- if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
- || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
- if (actionOutputPort.portNumber != null) {
- fm.setOutPort(actionOutputPort.portNumber);
- }
- }
-
- //
- // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
- // permanent.
- //
- if ((flowEntry.idleTimeout() != 0) ||
- (flowEntry.hardTimeout() != 0)) {
- fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
- }
-
- if (log.isTraceEnabled()) {
- log.trace("Installing flow entry {} into switch DPID: {} " +
- "flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
- , flowEntry.flowEntryUserState()
- , sw.getStringId()
- , flowEntry.flowEntryId()
- , matchSrcMac
- , matchDstMac
- , matchInPort
- , actionOutputPort
- );
- }
-
+ OFFlowMod fm = flowEntry.buildFlowMod(ofFactoryMap.get(sw.getOFVersion()));
+ // log.trace("Pushing flow mod {}", fm);
return addMessageImpl(sw, fm, priority);
}
/**
* Add message to queue.
- *
+ * <p>
* @param sw
* @param msg
* @param priority
@@ -999,7 +672,7 @@
synchronized (queue) {
queue.add(entry, priority);
if (log.isTraceEnabled()) {
- log.trace("Message is pushed: {}", entry.getOFMessage());
+ log.trace("Message is pushed to switch {}: {}", sw.getStringId(), entry.getOFMessage());
}
}
@@ -1014,20 +687,18 @@
if (future == null) {
return null;
}
-
try {
return future.get();
} catch (InterruptedException e) {
log.error("InterruptedException", e);
- return null;
} catch (ExecutionException e) {
log.error("ExecutionException", e);
- return null;
}
+ return null;
}
@Override
- public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+ public OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw) {
// TODO creation of message and future should be moved to OFSwitchImpl
if (sw == null) {
@@ -1035,25 +706,28 @@
}
OFBarrierRequest msg = createBarrierRequest(sw);
-
- OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
+ (int) msg.getXid());
barrierFutures.put(BarrierInfo.create(sw, msg), future);
-
addMessageImpl(sw, msg, MsgPriority.NORMAL);
-
return future;
}
protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
- OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
- msg.setXid(sw.getNextTransactionId());
-
- return msg;
+ OFFactory factory = ofFactoryMap.get(sw.getOFVersion());
+ if (factory == null) {
+ log.error("No OF Message Factory for switch {} with OFVersion {}", sw,
+ sw.getOFVersion());
+ return null;
+ }
+ return factory.buildBarrierRequest()
+ .setXid(sw.getNextTransactionId())
+ .build();
}
/**
* Get a queue attached to a switch.
- *
+ * <p>
* @param sw Switch object
* @return Queue object
*/
@@ -1072,7 +746,7 @@
/**
* Get a hash value correspondent to a switch.
- *
+ * <p>
* @param sw Switch object
* @return Hash value
*/
@@ -1084,7 +758,7 @@
/**
* Get a Thread object which processes the queue attached to a switch.
- *
+ * <p>
* @param sw Switch object
* @return Thread object
*/
@@ -1112,18 +786,17 @@
@Override
public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
if (log.isTraceEnabled()) {
- log.trace("Received BARRIER_REPLY from: {}", sw.getId());
+ log.trace("Received BARRIER_REPLY from : {}", sw.getStringId());
}
if ((msg.getType() != OFType.BARRIER_REPLY) ||
- !(msg instanceof OFBarrierReply)) {
+ !(msg instanceof OFBarrierReply)) {
log.error("Unexpected reply message: {}", msg.getType());
return Command.CONTINUE;
}
OFBarrierReply reply = (OFBarrierReply) msg;
BarrierInfo info = BarrierInfo.create(sw, reply);
-
// Deliver future if exists
OFBarrierReplyFuture future = barrierFutures.get(info);
if (future != null) {
@@ -1133,4 +806,5 @@
return Command.CONTINUE;
}
+
}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java
index af3cc70..7b2bfd0 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowSynchronizer.java
@@ -1,6 +1,5 @@
package net.onrc.onos.core.flowprogrammer;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -8,12 +7,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import net.floodlightcontroller.core.IOFSwitch;
-import net.onrc.onos.core.flowprogrammer.IFlowPusherService.MsgPriority;
import net.onrc.onos.core.util.FlowEntryId;
import org.openflow.protocol.OFFlowMod;
@@ -99,10 +96,6 @@
Set<FlowEntryWrapper> graphEntries = getFlowEntriesFromGraph();
long step1 = System.nanoTime();
Set<FlowEntryWrapper> switchEntries = getFlowEntriesFromSwitch();
- if (switchEntries == null) {
- log.debug("getFlowEntriesFromSwitch() failed");
- return null;
- }
long step2 = System.nanoTime();
SyncResult result = compare(graphEntries, switchEntries);
long step3 = System.nanoTime();
@@ -184,12 +177,12 @@
Set<FlowEntryWrapper> entries = new HashSet<FlowEntryWrapper>();
// TODO: fix when FlowSynchronizer is refactored
- /*
+ /*
for(IFlowEntry entry : swObj.getFlowEntries()) {
FlowEntryWrapper fe = new FlowEntryWrapper(entry);
entries.add(fe);
}
- */
+ */
return entries;
}
@@ -218,8 +211,9 @@
lengthU += req.getLengthU();
req.setLengthU(lengthU);
- List<OFStatistics> entries = null;
- try {
+ //List<OFStatistics> entries = null;
+ // XXX S when we fix stats, we fix this
+ /*try {
Future<List<OFStatistics>> dfuture = sw.getStatistics(req);
entries = dfuture.get();
} catch (IOException e) {
@@ -231,14 +225,16 @@
} catch (ExecutionException e) {
log.error("Error getting statistics", e);
return null;
- }
+ }*/
Set<FlowEntryWrapper> results = new HashSet<FlowEntryWrapper>();
+ /*
for (OFStatistics result : entries) {
OFFlowStatisticsReply entry = (OFFlowStatisticsReply) result;
FlowEntryWrapper fe = new FlowEntryWrapper(entry);
results.add(fe);
}
+ */
return results;
}
@@ -248,7 +244,7 @@
* FlowEntryWrapper represents abstract FlowEntry which is embodied
* by FlowEntryId (from GraphDB) or OFFlowStatisticsReply (from switch).
*/
- class FlowEntryWrapper {
+ static class FlowEntryWrapper {
FlowEntryId flowEntryId;
// TODO: fix when FlowSynchronizer is refactored
// IFlowEntry iFlowEntry;
@@ -256,12 +252,12 @@
// TODO: fix when FlowSynchronizer is refactored
- /*
+ /*
public FlowEntryWrapper(IFlowEntry entry) {
flowEntryId = new FlowEntryId(entry.getFlowEntryId());
iFlowEntry = entry;
}
- */
+ */
public FlowEntryWrapper(OFFlowStatisticsReply entry) {
flowEntryId = new FlowEntryId(entry.getCookie());
@@ -285,7 +281,7 @@
double startDB = System.nanoTime();
// Get the Flow Entry state from the Network Graph
// TODO: fix when FlowSynchronizer is refactored
- /*
+ /*
if (iFlowEntry == null) {
try {
// TODO: fix when FlowSynchronizer is refactored
@@ -296,13 +292,13 @@
return;
}
}
- */
+ */
dbTime = System.nanoTime() - startDB;
//
// TODO: The old FlowDatabaseOperation class is gone, so the code
//
- /*
+ /*
double startExtract = System.nanoTime();
FlowEntry flowEntry =
FlowDatabaseOperation.extractFlowEntry(iFlowEntry);
@@ -316,7 +312,7 @@
double startPush = System.nanoTime();
pusher.pushFlowEntry(sw, flowEntry, MsgPriority.HIGH);
pushTime = System.nanoTime() - startPush;
- */
+ */
}
/**
@@ -340,7 +336,8 @@
fm.setPriority(statisticsReply.getPriority());
fm.setOutPort(OFPort.OFPP_NONE);
- pusher.add(sw, fm, MsgPriority.HIGH);
+ // XXX BOC commented out pending FlowSync refactor
+ //pusher.add(sw, fm, MsgPriority.HIGH);
}
/**
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
index 37911a8..e94119a 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/IFlowPusherService.java
@@ -5,11 +5,11 @@
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.onrc.onos.core.util.FlowEntry;
+import net.onrc.onos.core.intent.FlowEntry;
import net.onrc.onos.core.util.Pair;
-import org.openflow.protocol.OFBarrierReply;
-import org.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFBarrierReply;
+import org.projectfloodlight.openflow.protocol.OFMessage;
/**
* FlowPusherService is a service to send message to switches in proper rate.
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/OFBarrierReplyFuture.java b/src/main/java/net/onrc/onos/core/flowprogrammer/OFBarrierReplyFuture.java
index bbd10cb..3991019 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/OFBarrierReplyFuture.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/OFBarrierReplyFuture.java
@@ -6,22 +6,24 @@
import net.floodlightcontroller.core.internal.OFMessageFuture;
import net.floodlightcontroller.threadpool.IThreadPoolService;
-import org.openflow.protocol.OFBarrierReply;
-import org.openflow.protocol.OFMessage;
-import org.openflow.protocol.OFType;
+import org.projectfloodlight.openflow.protocol.OFBarrierReply;
+import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFType;
+
+//XXX S note that other places are using old OFBarrierReply - so we broke that
public class OFBarrierReplyFuture extends OFMessageFuture<OFBarrierReply> {
protected volatile boolean finished;
public OFBarrierReplyFuture(IThreadPoolService tp,
- IOFSwitch sw, int transactionId) {
+ IOFSwitch sw, int transactionId) {
super(tp, sw, OFType.FEATURES_REPLY, transactionId);
init();
}
public OFBarrierReplyFuture(IThreadPoolService tp,
- IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
+ IOFSwitch sw, int transactionId, long timeout, TimeUnit unit) {
super(tp, sw, OFType.FEATURES_REPLY, transactionId, timeout, unit);
init();
}
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java
index 236cc85..4fc6782 100644
--- a/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/web/SendBarrierResource.java
@@ -2,8 +2,8 @@
import net.floodlightcontroller.core.IOFSwitch;
-import org.openflow.protocol.OFBarrierReply;
import org.openflow.util.HexString;
+import org.projectfloodlight.openflow.protocol.OFBarrierReply;
import org.restlet.resource.Get;
/**