Renamed devicemanager, flowprogrammer, linkdiscovery and util packages
net.onrc.onos.ofcontroller.devicemanager.* => net.onrc.onos.core.devicemanager.*
net.onrc.onos.ofcontroller.flowprogrammer.* => net.onrc.onos.core.flowprogrammer.*
net.onrc.onos.ofcontroller.linkdiscovery.* => net.onrc.onos.core.linkdiscovery.*
net.onrc.onos.ofcontroller.util.* => net.onrc.onos.core.util.*
Change-Id: Iaa865af552e8fb3a589e73d006569ac79f5a0f08
diff --git a/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
new file mode 100644
index 0000000..bdb1f43
--- /dev/null
+++ b/src/main/java/net/onrc/onos/core/flowprogrammer/FlowPusher.java
@@ -0,0 +1,1068 @@
+package net.onrc.onos.core.flowprogrammer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.openflow.protocol.*;
+import org.openflow.protocol.action.*;
+import org.openflow.protocol.factory.BasicFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.floodlightcontroller.core.FloodlightContext;
+import net.floodlightcontroller.core.IFloodlightProviderService;
+import net.floodlightcontroller.core.IOFMessageListener;
+import net.floodlightcontroller.core.IOFSwitch;
+import net.floodlightcontroller.core.internal.OFMessageFuture;
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.threadpool.IThreadPoolService;
+import net.floodlightcontroller.util.MACAddress;
+import net.floodlightcontroller.util.OFMessageDamper;
+import net.onrc.onos.core.util.FlowEntry;
+import net.onrc.onos.core.util.FlowEntryAction;
+import net.onrc.onos.core.util.FlowEntryActions;
+import net.onrc.onos.core.util.FlowEntryMatch;
+import net.onrc.onos.core.util.FlowEntryUserState;
+import net.onrc.onos.core.util.IPv4Net;
+import net.onrc.onos.core.util.Pair;
+import net.onrc.onos.core.util.Port;
+import net.onrc.onos.core.util.FlowEntryAction.*;
+
+/**
+ * FlowPusher is a implementation of FlowPusherService.
+ * FlowPusher assigns one message queue instance for each one switch.
+ * Number of message processing threads is configurable by constructor, and
+ * one thread can handle multiple message queues. Each queue will be assigned to
+ * a thread according to hash function defined by getHash().
+ * Each processing thread reads messages from queues and sends it to switches
+ * in round-robin. Processing thread also calculates rate of sending to suppress
+ * excessive message sending.
+ *
+ * @author Naoki Shiota
+ */
+public class FlowPusher implements IFlowPusherService, IOFMessageListener {
+ private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
+ protected static final int DEFAULT_NUMBER_THREAD = 1;
+
+ // TODO: Values copied from elsewhere (class LearningSwitch).
+ // The local copy should go away!
+ //
+ protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
+ protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
+
+ // Number of messages sent to switch at once
+ protected static final int MAX_MESSAGE_SEND = 100;
+
+ private static class SwitchQueueEntry {
+ OFMessage msg;
+
+ public SwitchQueueEntry(OFMessage msg) {
+ this.msg = msg;
+ }
+
+ public OFMessage getOFMessage() {
+ return msg;
+ }
+ }
+
+ /**
+ * SwitchQueue represents message queue attached to a switch.
+ * This consists of queue itself and variables used for limiting sending rate.
+ *
+ * @author Naoki Shiota
+ */
+ private class SwitchQueue {
+ List<Queue<SwitchQueueEntry>> raw_queues;
+ QueueState state;
+
+ // Max rate of sending message (bytes/ms). 0 implies no limitation.
+ long max_rate = 0; // 0 indicates no limitation
+ long last_sent_time = 0;
+ long last_sent_size = 0;
+
+ // "To be deleted" flag
+ boolean toBeDeleted = false;
+
+ SwitchQueue() {
+ raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
+ MsgPriority.values().length);
+ for (int i = 0; i < MsgPriority.values().length; ++i) {
+ raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
+ }
+
+ state = QueueState.READY;
+ }
+
+ /**
+ * Check if sending rate is within the rate
+ *
+ * @param current Current time
+ * @return true if within the rate
+ */
+ boolean isSendable(long current) {
+ if (max_rate == 0) {
+ // no limitation
+ return true;
+ }
+
+ if (current == last_sent_time) {
+ return false;
+ }
+
+ // Check if sufficient time (from aspect of rate) elapsed or not.
+ long rate = last_sent_size / (current - last_sent_time);
+ return (rate < max_rate);
+ }
+
+ /**
+ * Log time and size of last sent data.
+ *
+ * @param current Time to be sent.
+ * @param size Size of sent data (in bytes).
+ */
+ void logSentData(long current, long size) {
+ last_sent_time = current;
+ last_sent_size = size;
+ }
+
+ boolean add(SwitchQueueEntry entry, MsgPriority priority) {
+ Queue<SwitchQueueEntry> queue = getQueue(priority);
+ if (queue == null) {
+ log.error("Unexpected priority : ", priority);
+ return false;
+ }
+ return queue.add(entry);
+ }
+
+ /**
+ * Poll single appropriate entry object according to QueueState.
+ *
+ * @return Entry object.
+ */
+ SwitchQueueEntry poll() {
+ switch (state) {
+ case READY: {
+ for (int i = 0; i < raw_queues.size(); ++i) {
+ SwitchQueueEntry entry = raw_queues.get(i).poll();
+ if (entry != null) {
+ return entry;
+ }
+ }
+
+ return null;
+ }
+ case SUSPENDED: {
+ // Only polling from high priority queue
+ SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
+ return entry;
+ }
+ default:
+ log.error("Unexpected QueueState : ", state);
+ return null;
+ }
+ }
+
+ /**
+ * Check if this object has any messages in the queues to be sent
+ *
+ * @return True if there are some messages to be sent.
+ */
+ boolean hasMessageToSend() {
+ switch (state) {
+ case READY:
+ for (Queue<SwitchQueueEntry> queue : raw_queues) {
+ if (!queue.isEmpty()) {
+ return true;
+ }
+ }
+ break;
+ case SUSPENDED:
+ // Only checking high priority queue
+ return (!getQueue(MsgPriority.HIGH).isEmpty());
+ default:
+ log.error("Unexpected QueueState : ", state);
+ return false;
+ }
+
+ return false;
+ }
+
+ Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
+ return raw_queues.get(priority.ordinal());
+ }
+ }
+
+ /**
+ * BarrierInfo holds information to specify barrier message sent to switch.
+ *
+ * @author Naoki
+ */
+ private static class BarrierInfo {
+ final long dpid;
+ final int xid;
+
+ static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
+ return new BarrierInfo(sw.getId(), req.getXid());
+ }
+
+ static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
+ return new BarrierInfo(sw.getId(), rpy.getXid());
+ }
+
+ private BarrierInfo(long dpid, int xid) {
+ this.dpid = dpid;
+ this.xid = xid;
+ }
+
+ // Auto generated code by Eclipse
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (int) (dpid ^ (dpid >>> 32));
+ result = prime * result + xid;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+
+ BarrierInfo other = (BarrierInfo) obj;
+ return (this.dpid == other.dpid) && (this.xid == other.xid);
+ }
+
+
+ }
+
+ private OFMessageDamper messageDamper = null;
+ private IThreadPoolService threadPool = null;
+
+ private FloodlightContext context = null;
+ private BasicFactory factory = null;
+
+ // Map of threads versus dpid
+ private Map<Long, FlowPusherThread> threadMap = null;
+ // Map from (DPID and transaction ID) to Future objects.
+ private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
+ = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
+
+ private int number_thread;
+
+ /**
+ * Main thread that reads messages from queues and sends them to switches.
+ *
+ * @author Naoki Shiota
+ */
+ private class FlowPusherThread extends Thread {
+ private Map<IOFSwitch, SwitchQueue> assignedQueues
+ = new ConcurrentHashMap<IOFSwitch, SwitchQueue>();
+
+ final Lock queuingLock = new ReentrantLock();
+ final Condition messagePushed = queuingLock.newCondition();
+
+ @Override
+ public void run() {
+ this.setName("FlowPusherThread " + this.getId());
+ while (true) {
+ while (!queuesHasMessageToSend()) {
+ queuingLock.lock();
+
+ try {
+ // wait for message pushed to queue
+ messagePushed.await();
+ } catch (InterruptedException e) {
+ // Interrupted to be shut down (not an error)
+ log.debug("FlowPusherThread is interrupted");
+ return;
+ } finally {
+ queuingLock.unlock();
+ }
+ }
+
+ // for safety of concurrent access, copy set of key objects
+ Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
+ for (IOFSwitch sw : assignedQueues.keySet()) {
+ keys.add(sw);
+ }
+
+ for (IOFSwitch sw : keys) {
+ SwitchQueue queue = assignedQueues.get(sw);
+
+ if (sw == null || queue == null) {
+ continue;
+ }
+
+ synchronized (queue) {
+ processQueue(sw, queue, MAX_MESSAGE_SEND);
+ if (queue.toBeDeleted && !queue.hasMessageToSend()) {
+ // remove queue if flagged to be.
+ assignedQueues.remove(sw);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Read messages from queue and send them to the switch.
+ * If number of messages excess the limit, stop sending messages.
+ *
+ * @param sw Switch to which messages will be sent.
+ * @param queue Queue of messages.
+ * @param max_msg Limitation of number of messages to be sent. If set to 0,
+ * all messages in queue will be sent.
+ */
+ private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
+ // check sending rate and determine it to be sent or not
+ long current_time = System.currentTimeMillis();
+ long size = 0;
+
+ if (queue.isSendable(current_time)) {
+ int i = 0;
+ while (queue.hasMessageToSend()) {
+ // Number of messages excess the limit
+ if (0 < max_msg && max_msg <= i) {
+ break;
+ }
+ ++i;
+
+ SwitchQueueEntry queueEntry;
+ synchronized (queue) {
+ queueEntry = queue.poll();
+ }
+
+ OFMessage msg = queueEntry.getOFMessage();
+ try {
+ messageDamper.write(sw, msg, context);
+ if (log.isTraceEnabled()) {
+ log.trace("Pusher sends message : {}", msg);
+ }
+ size += msg.getLength();
+ } catch (IOException e) {
+ e.printStackTrace();
+ log.error("Exception in sending message ({}) : {}", msg, e);
+ }
+ }
+
+ sw.flush();
+ queue.logSentData(current_time, size);
+ }
+ }
+
+ private boolean queuesHasMessageToSend() {
+ for (SwitchQueue queue : assignedQueues.values()) {
+ if (queue.hasMessageToSend()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private void notifyMessagePushed() {
+ queuingLock.lock();
+ try {
+ messagePushed.signal();
+ } finally {
+ queuingLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Initialize object with one thread.
+ */
+ public FlowPusher() {
+ number_thread = DEFAULT_NUMBER_THREAD;
+ }
+
+ /**
+ * Initialize object with threads of given number.
+ *
+ * @param number_thread Number of threads to handle messages.
+ */
+ public FlowPusher(int number_thread) {
+ if (number_thread > 0) {
+ this.number_thread = number_thread;
+ } else {
+ this.number_thread = DEFAULT_NUMBER_THREAD;
+ }
+ }
+
+ /**
+ * Set parameters needed for sending messages.
+ *
+ * @param context FloodlightContext used for sending messages.
+ * If null, FlowPusher uses default context.
+ * @param modContext FloodlightModuleContext used for acquiring
+ * ThreadPoolService and registering MessageListener.
+ * @param factory Factory object to create OFMessage objects.
+ * @param damper Message damper used for sending messages.
+ * If null, FlowPusher creates its own damper object.
+ */
+ public void init(FloodlightContext context,
+ FloodlightModuleContext modContext,
+ BasicFactory factory,
+ OFMessageDamper damper) {
+ this.context = context;
+ this.factory = factory;
+ this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
+ IFloodlightProviderService flservice
+ = modContext.getServiceImpl(IFloodlightProviderService.class);
+ flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
+
+ if (damper != null) {
+ messageDamper = damper;
+ } else {
+ // use default values
+ messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
+ EnumSet.of(OFType.FLOW_MOD),
+ OFMESSAGE_DAMPER_TIMEOUT);
+ }
+ }
+
+ /**
+ * Begin processing queue.
+ */
+ public void start() {
+ if (factory == null) {
+ log.error("FlowPusher not yet initialized.");
+ return;
+ }
+
+ threadMap = new HashMap<Long, FlowPusherThread>();
+ for (long i = 0; i < number_thread; ++i) {
+ FlowPusherThread thread = new FlowPusherThread();
+
+ threadMap.put(i, thread);
+ thread.start();
+ }
+ }
+
+ @Override
+ public boolean suspend(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ // create queue in case suspend is called before first message addition
+ queue = createQueueImpl(sw);
+ }
+
+ synchronized (queue) {
+ if (queue.state == QueueState.READY) {
+ queue.state = QueueState.SUSPENDED;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public boolean resume(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ log.error("No queue is attached to DPID : {}", sw.getId());
+ return false;
+ }
+
+ synchronized (queue) {
+ if (queue.state == QueueState.SUSPENDED) {
+ queue.state = QueueState.READY;
+
+ // Free the latch if queue has any messages
+ FlowPusherThread thread = getProcessingThread(sw);
+ if (queue.hasMessageToSend()) {
+ thread.notifyMessagePushed();
+ }
+ return true;
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public QueueState getState(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+
+ if (queue == null) {
+ return QueueState.UNKNOWN;
+ }
+
+ return queue.state;
+ }
+
+ /**
+ * Stop processing queue and exit thread.
+ */
+ public void stop() {
+ if (threadMap == null) {
+ return;
+ }
+
+ for (FlowPusherThread t : threadMap.values()) {
+ t.interrupt();
+ }
+ }
+
+ @Override
+ public void setRate(IOFSwitch sw, long rate) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ queue = createQueueImpl(sw);
+ }
+
+ if (rate > 0) {
+ log.debug("rate for {} is set to {}", sw.getId(), rate);
+ synchronized (queue) {
+ queue.max_rate = rate;
+ }
+ }
+ }
+
+ @Override
+ public boolean createQueue(IOFSwitch sw) {
+ SwitchQueue queue = createQueueImpl(sw);
+
+ return (queue != null);
+ }
+
+ protected SwitchQueue createQueueImpl(IOFSwitch sw) {
+ SwitchQueue queue = getQueue(sw);
+ if (queue != null) {
+ return queue;
+ }
+
+ FlowPusherThread proc = getProcessingThread(sw);
+ queue = new SwitchQueue();
+ queue.state = QueueState.READY;
+ proc.assignedQueues.put(sw, queue);
+
+ return queue;
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw) {
+ return deleteQueue(sw, false);
+ }
+
+ @Override
+ public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
+ FlowPusherThread proc = getProcessingThread(sw);
+
+ if (forceStop) {
+ SwitchQueue queue = proc.assignedQueues.remove(sw);
+ if (queue == null) {
+ return false;
+ }
+ return true;
+ } else {
+ SwitchQueue queue = getQueue(sw);
+ if (queue == null) {
+ return false;
+ }
+ synchronized (queue) {
+ queue.toBeDeleted = true;
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public boolean add(IOFSwitch sw, OFMessage msg) {
+ return add(sw, msg, MsgPriority.NORMAL);
+ }
+
+ @Override
+ public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+ return addMessageImpl(sw, msg, priority);
+ }
+
+ @Override
+ public void pushFlowEntries(
+ Collection<Pair<IOFSwitch, FlowEntry>> entries) {
+ pushFlowEntries(entries, MsgPriority.NORMAL);
+ }
+
+ @Override
+ public void pushFlowEntries(
+ Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
+
+ for (Pair<IOFSwitch, FlowEntry> entry : entries) {
+ add(entry.first, entry.second, priority);
+ }
+ }
+
+ @Override
+ public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
+ pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
+ }
+
+ @Override
+ public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+ Collection<Pair<IOFSwitch, FlowEntry>> entries =
+ new LinkedList<Pair<IOFSwitch, FlowEntry>>();
+
+ entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
+ pushFlowEntries(entries, priority);
+ }
+
+ /**
+ * Create a message from FlowEntry and add it to the queue of the switch.
+ *
+ * @param sw Switch to which message is pushed.
+ * @param flowEntry FlowEntry object used for creating message.
+ * @return true if message is successfully added to a queue.
+ */
+ private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
+ //
+ // Create the OpenFlow Flow Modification Entry to push
+ //
+ OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+ long cookie = flowEntry.flowEntryId().value();
+
+ short flowModCommand = OFFlowMod.OFPFC_ADD;
+ if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
+ flowModCommand = OFFlowMod.OFPFC_ADD;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
+ flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
+ } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
+ flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
+ } else {
+ // Unknown user state. Ignore the entry
+ log.debug(
+ "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
+ flowEntry.flowEntryId(),
+ flowEntry.flowEntryUserState());
+ return false;
+ }
+
+ //
+ // Fetch the match conditions.
+ //
+ // NOTE: The Flow matching conditions common for all Flow Entries are
+ // used ONLY if a Flow Entry does NOT have the corresponding matching
+ // condition set.
+ //
+ OFMatch match = new OFMatch();
+ match.setWildcards(OFMatch.OFPFW_ALL);
+ FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
+
+ // Match the Incoming Port
+ Port matchInPort = flowEntryMatch.inPort();
+ if (matchInPort != null) {
+ match.setInputPort(matchInPort.value());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
+ }
+
+ // Match the Source MAC address
+ MACAddress matchSrcMac = flowEntryMatch.srcMac();
+ if (matchSrcMac != null) {
+ match.setDataLayerSource(matchSrcMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
+ }
+
+ // Match the Destination MAC address
+ MACAddress matchDstMac = flowEntryMatch.dstMac();
+ if (matchDstMac != null) {
+ match.setDataLayerDestination(matchDstMac.toString());
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
+ }
+
+ // Match the Ethernet Frame Type
+ Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
+ if (matchEthernetFrameType != null) {
+ match.setDataLayerType(matchEthernetFrameType);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
+ }
+
+ // Match the VLAN ID
+ Short matchVlanId = flowEntryMatch.vlanId();
+ if (matchVlanId != null) {
+ match.setDataLayerVirtualLan(matchVlanId);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
+ }
+
+ // Match the VLAN priority
+ Byte matchVlanPriority = flowEntryMatch.vlanPriority();
+ if (matchVlanPriority != null) {
+ match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
+ match.setWildcards(match.getWildcards()
+ & ~OFMatch.OFPFW_DL_VLAN_PCP);
+ }
+
+ // Match the Source IPv4 Network prefix
+ IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
+ if (matchSrcIPv4Net != null) {
+ match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
+ }
+
+ // Natch the Destination IPv4 Network prefix
+ IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
+ if (matchDstIPv4Net != null) {
+ match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
+ }
+
+ // Match the IP protocol
+ Byte matchIpProto = flowEntryMatch.ipProto();
+ if (matchIpProto != null) {
+ match.setNetworkProtocol(matchIpProto);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
+ }
+
+ // Match the IP ToS (DSCP field, 6 bits)
+ Byte matchIpToS = flowEntryMatch.ipToS();
+ if (matchIpToS != null) {
+ match.setNetworkTypeOfService(matchIpToS);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
+ }
+
+ // Match the Source TCP/UDP port
+ Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
+ if (matchSrcTcpUdpPort != null) {
+ match.setTransportSource(matchSrcTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
+ }
+
+ // Match the Destination TCP/UDP port
+ Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
+ if (matchDstTcpUdpPort != null) {
+ match.setTransportDestination(matchDstTcpUdpPort);
+ match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
+ }
+
+ //
+ // Fetch the actions
+ //
+ Short actionOutputPort = null;
+ List<OFAction> openFlowActions = new ArrayList<OFAction>();
+ int actionsLen = 0;
+ FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
+ //
+ for (FlowEntryAction action : flowEntryActions.actions()) {
+ ActionOutput actionOutput = action.actionOutput();
+ ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
+ ActionSetVlanPriority actionSetVlanPriority = action
+ .actionSetVlanPriority();
+ ActionStripVlan actionStripVlan = action.actionStripVlan();
+ ActionSetEthernetAddr actionSetEthernetSrcAddr = action
+ .actionSetEthernetSrcAddr();
+ ActionSetEthernetAddr actionSetEthernetDstAddr = action
+ .actionSetEthernetDstAddr();
+ ActionSetIPv4Addr actionSetIPv4SrcAddr = action
+ .actionSetIPv4SrcAddr();
+ ActionSetIPv4Addr actionSetIPv4DstAddr = action
+ .actionSetIPv4DstAddr();
+ ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
+ ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
+ .actionSetTcpUdpSrcPort();
+ ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
+ .actionSetTcpUdpDstPort();
+ ActionEnqueue actionEnqueue = action.actionEnqueue();
+
+ if (actionOutput != null) {
+ actionOutputPort = actionOutput.port().value();
+ // XXX: The max length is hard-coded for now
+ OFActionOutput ofa = new OFActionOutput(actionOutput.port()
+ .value(), (short) 0xffff);
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanId != null) {
+ OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
+ actionSetVlanId.vlanId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetVlanPriority != null) {
+ OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
+ actionSetVlanPriority.vlanPriority());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionStripVlan != null) {
+ if (actionStripVlan.stripVlan() == true) {
+ OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ if (actionSetEthernetSrcAddr != null) {
+ OFActionDataLayerSource ofa = new OFActionDataLayerSource(
+ actionSetEthernetSrcAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetEthernetDstAddr != null) {
+ OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
+ actionSetEthernetDstAddr.addr().toBytes());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4SrcAddr != null) {
+ OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
+ actionSetIPv4SrcAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIPv4DstAddr != null) {
+ OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
+ actionSetIPv4DstAddr.addr().value());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetIpToS != null) {
+ OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
+ actionSetIpToS.ipToS());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpSrcPort != null) {
+ OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
+ actionSetTcpUdpSrcPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionSetTcpUdpDstPort != null) {
+ OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
+ actionSetTcpUdpDstPort.port());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+
+ if (actionEnqueue != null) {
+ OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
+ .value(), actionEnqueue.queueId());
+ openFlowActions.add(ofa);
+ actionsLen += ofa.getLength();
+ }
+ }
+
+ fm.setIdleTimeout((short) flowEntry.idleTimeout())
+ .setHardTimeout((short) flowEntry.hardTimeout())
+ .setPriority((short) flowEntry.priority())
+ .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
+ .setCommand(flowModCommand).setMatch(match)
+ .setActions(openFlowActions)
+ .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
+ fm.setOutPort(OFPort.OFPP_NONE.getValue());
+ if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
+ || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
+ if (actionOutputPort != null)
+ fm.setOutPort(actionOutputPort);
+ }
+
+ //
+ // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
+ // permanent.
+ //
+ if ((flowEntry.idleTimeout() != 0) ||
+ (flowEntry.hardTimeout() != 0)) {
+ fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
+ }
+
+ if (log.isTraceEnabled()) {
+ log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
+ , flowEntry.flowEntryUserState()
+ , sw.getStringId()
+ , flowEntry.flowEntryId()
+ , matchSrcMac
+ , matchDstMac
+ , matchInPort
+ , actionOutputPort
+ );
+ }
+
+ return addMessageImpl(sw, fm, priority);
+ }
+
+ /**
+ * Add message to queue
+ *
+ * @param sw
+ * @param msg
+ * @param flowEntryId
+ * @return
+ */
+ protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
+ FlowPusherThread thread = getProcessingThread(sw);
+
+ SwitchQueue queue = getQueue(sw);
+
+ // create queue at first addition of message
+ if (queue == null) {
+ queue = createQueueImpl(sw);
+ }
+
+ SwitchQueueEntry entry = new SwitchQueueEntry(msg);
+
+ synchronized (queue) {
+ queue.add(entry, priority);
+ if (log.isTraceEnabled()) {
+ log.trace("Message is pushed : {}", entry.getOFMessage());
+ }
+ }
+
+ thread.notifyMessagePushed();
+
+ return true;
+ }
+
+ @Override
+ public OFBarrierReply barrier(IOFSwitch sw) {
+ OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
+ if (future == null) {
+ return null;
+ }
+
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ log.error("InterruptedException: {}", e);
+ return null;
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ log.error("ExecutionException: {}", e);
+ return null;
+ }
+ }
+
+ @Override
+ public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
+ // TODO creation of message and future should be moved to OFSwitchImpl
+
+ if (sw == null) {
+ return null;
+ }
+
+ OFBarrierRequest msg = createBarrierRequest(sw);
+
+ OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
+ barrierFutures.put(BarrierInfo.create(sw, msg), future);
+
+ addMessageImpl(sw, msg, MsgPriority.NORMAL);
+
+ return future;
+ }
+
+ protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
+ OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
+ msg.setXid(sw.getNextTransactionId());
+
+ return msg;
+ }
+
+ /**
+ * Get a queue attached to a switch.
+ *
+ * @param sw Switch object
+ * @return Queue object
+ */
+ protected SwitchQueue getQueue(IOFSwitch sw) {
+ if (sw == null) {
+ return null;
+ }
+
+ FlowPusherThread th = getProcessingThread(sw);
+ if (th == null) {
+ return null;
+ }
+
+ return th.assignedQueues.get(sw);
+ }
+
+ /**
+ * Get a hash value correspondent to a switch.
+ *
+ * @param sw Switch object
+ * @return Hash value
+ */
+ protected long getHash(IOFSwitch sw) {
+ // This code assumes DPID is sequentially assigned.
+ // TODO consider equalization algorithm
+ return sw.getId() % number_thread;
+ }
+
+ /**
+ * Get a Thread object which processes the queue attached to a switch.
+ *
+ * @param sw Switch object
+ * @return Thread object
+ */
+ protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
+ long hash = getHash(sw);
+
+ return threadMap.get(hash);
+ }
+
+ @Override
+ public String getName() {
+ return "flowpusher";
+ }
+
+ @Override
+ public boolean isCallbackOrderingPrereq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public boolean isCallbackOrderingPostreq(OFType type, String name) {
+ return false;
+ }
+
+ @Override
+ public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
+ if (log.isTraceEnabled()) {
+ log.trace("Received BARRIER_REPLY from : {}", sw.getId());
+ }
+
+ if (msg.getType() != OFType.BARRIER_REPLY) {
+ log.error("Unexpected reply message : {}", msg.getType());
+ return Command.CONTINUE;
+ }
+
+ OFBarrierReply reply = (OFBarrierReply) msg;
+ BarrierInfo info = BarrierInfo.create(sw, reply);
+
+ // Deliver future if exists
+ OFBarrierReplyFuture future = barrierFutures.get(info);
+ if (future != null) {
+ future.deliverFuture(sw, msg);
+ barrierFutures.remove(info);
+ }
+
+ return Command.CONTINUE;
+ }
+}