blob: 2c64ec1f7b6ace20b6b7c0f551e563a716a3103e [file] [log] [blame]
Jonathan Hart23701d12014-04-03 10:45:48 -07001package net.onrc.onos.core.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiota47993102014-07-09 14:00:54 -07009import java.util.Iterator;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota47993102014-07-09 14:00:54 -070013import java.util.Map.Entry;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070014import java.util.Queue;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070015import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080016import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070017import java.util.concurrent.locks.Condition;
18import java.util.concurrent.locks.Lock;
19import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070021import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080022import net.floodlightcontroller.core.IFloodlightProviderService;
23import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070024import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080025import net.floodlightcontroller.core.internal.OFMessageFuture;
26import net.floodlightcontroller.core.module.FloodlightModuleContext;
27import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080028import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080029import net.floodlightcontroller.util.OFMessageDamper;
Jonathan Hart23701d12014-04-03 10:45:48 -070030import net.onrc.onos.core.util.FlowEntry;
31import net.onrc.onos.core.util.FlowEntryAction;
Jonathan Harta99ec672014-04-03 11:30:34 -070032import net.onrc.onos.core.util.FlowEntryAction.ActionEnqueue;
33import net.onrc.onos.core.util.FlowEntryAction.ActionOutput;
34import net.onrc.onos.core.util.FlowEntryAction.ActionSetEthernetAddr;
35import net.onrc.onos.core.util.FlowEntryAction.ActionSetIPv4Addr;
36import net.onrc.onos.core.util.FlowEntryAction.ActionSetIpToS;
37import net.onrc.onos.core.util.FlowEntryAction.ActionSetTcpUdpPort;
38import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanId;
39import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanPriority;
40import net.onrc.onos.core.util.FlowEntryAction.ActionStripVlan;
Jonathan Hart23701d12014-04-03 10:45:48 -070041import net.onrc.onos.core.util.FlowEntryActions;
42import net.onrc.onos.core.util.FlowEntryMatch;
43import net.onrc.onos.core.util.FlowEntryUserState;
44import net.onrc.onos.core.util.IPv4Net;
45import net.onrc.onos.core.util.Pair;
Yuta HIGUCHIfb564502014-06-16 21:29:00 -070046import net.onrc.onos.core.util.PortNumber;
Jonathan Harta99ec672014-04-03 11:30:34 -070047
48import org.openflow.protocol.OFBarrierReply;
49import org.openflow.protocol.OFBarrierRequest;
50import org.openflow.protocol.OFFlowMod;
51import org.openflow.protocol.OFMatch;
52import org.openflow.protocol.OFMessage;
53import org.openflow.protocol.OFPacketOut;
54import org.openflow.protocol.OFPort;
55import org.openflow.protocol.OFType;
56import org.openflow.protocol.action.OFAction;
57import org.openflow.protocol.action.OFActionDataLayerDestination;
58import org.openflow.protocol.action.OFActionDataLayerSource;
59import org.openflow.protocol.action.OFActionEnqueue;
60import org.openflow.protocol.action.OFActionNetworkLayerDestination;
61import org.openflow.protocol.action.OFActionNetworkLayerSource;
62import org.openflow.protocol.action.OFActionNetworkTypeOfService;
63import org.openflow.protocol.action.OFActionOutput;
64import org.openflow.protocol.action.OFActionStripVirtualLan;
65import org.openflow.protocol.action.OFActionTransportLayerDestination;
66import org.openflow.protocol.action.OFActionTransportLayerSource;
67import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
68import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
69import org.openflow.protocol.factory.BasicFactory;
70import org.slf4j.Logger;
71import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070072
Jonathan Hartc00f5c22014-06-10 15:14:40 -070073import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
74
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070075/**
Naoki Shiotab485d412013-11-26 12:04:19 -080076 * FlowPusher is a implementation of FlowPusherService.
77 * FlowPusher assigns one message queue instance for each one switch.
78 * Number of message processing threads is configurable by constructor, and
Ray Milkey8e5170e2014-04-02 12:09:55 -070079 * one thread can handle multiple message queues. Each queue will be assigned to
Naoki Shiotab485d412013-11-26 12:04:19 -080080 * a thread according to hash function defined by getHash().
81 * Each processing thread reads messages from queues and sends it to switches
82 * in round-robin. Processing thread also calculates rate of sending to suppress
83 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070084 */
Ray Milkey1584ec82014-04-10 11:58:30 -070085public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
Ray Milkeyec838942014-04-09 11:28:43 -070086 private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070087 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080088
Naoki Shiota7d0cf272013-11-05 10:18:12 -080089 // TODO: Values copied from elsewhere (class LearningSwitch).
90 // The local copy should go away!
91 //
Yuta HIGUCHI7930d8a2014-06-09 11:32:37 -070092 protected static final int OFMESSAGE_DAMPER_CAPACITY = 10000; // TODO: find sweet spot
Ray Milkey8e5170e2014-04-02 12:09:55 -070093 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
94
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080095 // Number of messages sent to switch at once
96 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080097
Ray Milkey8e5170e2014-04-02 12:09:55 -070098 private static class SwitchQueueEntry {
99 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700100
Ray Milkey8e5170e2014-04-02 12:09:55 -0700101 public SwitchQueueEntry(OFMessage msg) {
102 this.msg = msg;
103 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700104
Ray Milkey8e5170e2014-04-02 12:09:55 -0700105 public OFMessage getOFMessage() {
106 return msg;
107 }
108 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700109
Ray Milkey8e5170e2014-04-02 12:09:55 -0700110 /**
111 * SwitchQueue represents message queue attached to a switch.
112 * This consists of queue itself and variables used for limiting sending rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700113 */
Pavlin Radoslavovfee80982014-04-10 12:12:04 -0700114 private static class SwitchQueue {
Ray Milkey2476cac2014-04-08 11:03:21 -0700115 List<Queue<SwitchQueueEntry>> rawQueues;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700116 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800117
Ray Milkey8e5170e2014-04-02 12:09:55 -0700118 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Ray Milkey2476cac2014-04-08 11:03:21 -0700119 long maxRate = 0; // 0 indicates no limitation
120 long lastSentTime = 0;
121 long lastSentSize = 0;
Naoki Shiota05334692014-03-18 16:06:36 -0700122
Ray Milkey8e5170e2014-04-02 12:09:55 -0700123 // "To be deleted" flag
124 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800125
Ray Milkey8e5170e2014-04-02 12:09:55 -0700126 SwitchQueue() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700127 rawQueues = new ArrayList<Queue<SwitchQueueEntry>>(
Ray Milkey8e5170e2014-04-02 12:09:55 -0700128 MsgPriority.values().length);
129 for (int i = 0; i < MsgPriority.values().length; ++i) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700130 rawQueues.add(i, new ArrayDeque<SwitchQueueEntry>());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700131 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800132
Ray Milkey8e5170e2014-04-02 12:09:55 -0700133 state = QueueState.READY;
134 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800135
Ray Milkey8e5170e2014-04-02 12:09:55 -0700136 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700137 * Check if sending rate is within the rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700138 *
139 * @param current Current time
140 * @return true if within the rate
141 */
142 boolean isSendable(long current) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700143 if (maxRate == 0) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700144 // no limitation
145 return true;
146 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800147
Ray Milkey2476cac2014-04-08 11:03:21 -0700148 if (current == lastSentTime) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700149 return false;
150 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800151
Ray Milkey8e5170e2014-04-02 12:09:55 -0700152 // Check if sufficient time (from aspect of rate) elapsed or not.
Ray Milkey2476cac2014-04-08 11:03:21 -0700153 long rate = lastSentSize / (current - lastSentTime);
154 return (rate < maxRate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700155 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800156
Ray Milkey8e5170e2014-04-02 12:09:55 -0700157 /**
158 * Log time and size of last sent data.
159 *
160 * @param current Time to be sent.
161 * @param size Size of sent data (in bytes).
162 */
163 void logSentData(long current, long size) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700164 lastSentTime = current;
165 lastSentSize = size;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700166 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700167
Ray Milkey8e5170e2014-04-02 12:09:55 -0700168 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
169 Queue<SwitchQueueEntry> queue = getQueue(priority);
170 if (queue == null) {
171 log.error("Unexpected priority : ", priority);
172 return false;
173 }
174 return queue.add(entry);
175 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800176
Ray Milkey8e5170e2014-04-02 12:09:55 -0700177 /**
178 * Poll single appropriate entry object according to QueueState.
179 *
180 * @return Entry object.
181 */
182 SwitchQueueEntry poll() {
183 switch (state) {
184 case READY: {
Ray Milkey2476cac2014-04-08 11:03:21 -0700185 for (int i = 0; i < rawQueues.size(); ++i) {
186 SwitchQueueEntry entry = rawQueues.get(i).poll();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700187 if (entry != null) {
188 return entry;
189 }
190 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800191
Ray Milkey8e5170e2014-04-02 12:09:55 -0700192 return null;
193 }
194 case SUSPENDED: {
195 // Only polling from high priority queue
196 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
197 return entry;
198 }
199 default:
200 log.error("Unexpected QueueState : ", state);
201 return null;
202 }
203 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800204
Ray Milkey8e5170e2014-04-02 12:09:55 -0700205 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700206 * Check if this object has any messages in the queues to be sent.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700207 *
208 * @return True if there are some messages to be sent.
209 */
210 boolean hasMessageToSend() {
211 switch (state) {
212 case READY:
Ray Milkey2476cac2014-04-08 11:03:21 -0700213 for (Queue<SwitchQueueEntry> queue : rawQueues) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700214 if (!queue.isEmpty()) {
215 return true;
216 }
217 }
218 break;
219 case SUSPENDED:
220 // Only checking high priority queue
221 return (!getQueue(MsgPriority.HIGH).isEmpty());
222 default:
223 log.error("Unexpected QueueState : ", state);
224 return false;
225 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800226
Ray Milkey8e5170e2014-04-02 12:09:55 -0700227 return false;
228 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800229
Ray Milkey8e5170e2014-04-02 12:09:55 -0700230 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700231 return rawQueues.get(priority.ordinal());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700232 }
233 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800234
Ray Milkey8e5170e2014-04-02 12:09:55 -0700235 /**
236 * BarrierInfo holds information to specify barrier message sent to switch.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700237 */
Ray Milkey1584ec82014-04-10 11:58:30 -0700238 private static final class BarrierInfo {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700239 final long dpid;
240 final int xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800241
Ray Milkey8e5170e2014-04-02 12:09:55 -0700242 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
243 return new BarrierInfo(sw.getId(), req.getXid());
244 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800245
Ray Milkey8e5170e2014-04-02 12:09:55 -0700246 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
247 return new BarrierInfo(sw.getId(), rpy.getXid());
248 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800249
Ray Milkey8e5170e2014-04-02 12:09:55 -0700250 private BarrierInfo(long dpid, int xid) {
251 this.dpid = dpid;
252 this.xid = xid;
253 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800254
Ray Milkey8e5170e2014-04-02 12:09:55 -0700255 // Auto generated code by Eclipse
256 @Override
257 public int hashCode() {
258 final int prime = 31;
259 int result = 1;
260 result = prime * result + (int) (dpid ^ (dpid >>> 32));
261 result = prime * result + xid;
262 return result;
263 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800264
Ray Milkey8e5170e2014-04-02 12:09:55 -0700265 @Override
266 public boolean equals(Object obj) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700267 if (this == obj) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700268 return true;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700269 }
270 if (obj == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700271 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700272 }
273 if (getClass() != obj.getClass()) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700274 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700275 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800276
Ray Milkey8e5170e2014-04-02 12:09:55 -0700277 BarrierInfo other = (BarrierInfo) obj;
278 return (this.dpid == other.dpid) && (this.xid == other.xid);
279 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800280
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800281
Ray Milkey8e5170e2014-04-02 12:09:55 -0700282 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800283
Ray Milkey8e5170e2014-04-02 12:09:55 -0700284 private OFMessageDamper messageDamper = null;
285 private IThreadPoolService threadPool = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800286
Ray Milkey8e5170e2014-04-02 12:09:55 -0700287 private FloodlightContext context = null;
288 private BasicFactory factory = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800289
Ray Milkey8e5170e2014-04-02 12:09:55 -0700290 // Map of threads versus dpid
291 private Map<Long, FlowPusherThread> threadMap = null;
292 // Map from (DPID and transaction ID) to Future objects.
293 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
294 = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800295
Ray Milkey2476cac2014-04-08 11:03:21 -0700296 private int numberThread;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800297
Ray Milkey8e5170e2014-04-02 12:09:55 -0700298 /**
299 * Main thread that reads messages from queues and sends them to switches.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700300 */
301 private class FlowPusherThread extends Thread {
302 private Map<IOFSwitch, SwitchQueue> assignedQueues
303 = new ConcurrentHashMap<IOFSwitch, SwitchQueue>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800304
Ray Milkey8e5170e2014-04-02 12:09:55 -0700305 final Lock queuingLock = new ReentrantLock();
306 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800307
Ray Milkey8e5170e2014-04-02 12:09:55 -0700308 @Override
309 public void run() {
310 this.setName("FlowPusherThread " + this.getId());
311 while (true) {
312 while (!queuesHasMessageToSend()) {
313 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800314
Ray Milkey8e5170e2014-04-02 12:09:55 -0700315 try {
316 // wait for message pushed to queue
317 messagePushed.await();
318 } catch (InterruptedException e) {
319 // Interrupted to be shut down (not an error)
320 log.debug("FlowPusherThread is interrupted");
321 return;
322 } finally {
323 queuingLock.unlock();
324 }
325 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800326
Naoki Shiota47993102014-07-09 14:00:54 -0700327 for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues.entrySet().iterator();
328 it.hasNext();
329 ) {
330 Entry<IOFSwitch, SwitchQueue> entry = it.next();
331 IOFSwitch sw = entry.getKey();
332 SwitchQueue queue = entry.getValue();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800333
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700334 if (queue == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700335 continue;
336 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800337
Ray Milkey8e5170e2014-04-02 12:09:55 -0700338 synchronized (queue) {
339 processQueue(sw, queue, MAX_MESSAGE_SEND);
340 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
341 // remove queue if flagged to be.
Naoki Shiota47993102014-07-09 14:00:54 -0700342 it.remove();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700343 }
344 }
345 }
346 }
347 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800348
Ray Milkey8e5170e2014-04-02 12:09:55 -0700349 /**
350 * Read messages from queue and send them to the switch.
351 * If number of messages excess the limit, stop sending messages.
352 *
353 * @param sw Switch to which messages will be sent.
354 * @param queue Queue of messages.
Ray Milkey9526d6f2014-04-10 14:54:15 -0700355 * @param maxMsg Limitation of number of messages to be sent. If set to 0,
Ray Milkey8e5170e2014-04-02 12:09:55 -0700356 * all messages in queue will be sent.
357 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700358 private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700359 // check sending rate and determine it to be sent or not
Ray Milkey2476cac2014-04-08 11:03:21 -0700360 long currentTime = System.currentTimeMillis();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700361 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800362
Naoki Shiota47993102014-07-09 14:00:54 -0700363 if (sw.isConnected() && queue.isSendable(currentTime)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700364 int i = 0;
365 while (queue.hasMessageToSend()) {
366 // Number of messages excess the limit
Ray Milkey9526d6f2014-04-10 14:54:15 -0700367 if (0 < maxMsg && maxMsg <= i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700368 break;
369 }
370 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800371
Ray Milkey8e5170e2014-04-02 12:09:55 -0700372 SwitchQueueEntry queueEntry;
373 synchronized (queue) {
374 queueEntry = queue.poll();
375 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800376
Ray Milkey8e5170e2014-04-02 12:09:55 -0700377 OFMessage msg = queueEntry.getOFMessage();
378 try {
379 messageDamper.write(sw, msg, context);
380 if (log.isTraceEnabled()) {
381 log.trace("Pusher sends message : {}", msg);
382 }
383 size += msg.getLength();
384 } catch (IOException e) {
Yuta HIGUCHIe933a282014-07-13 21:02:29 -0700385 log.error("Exception in sending message (" + msg + "):", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700386 }
387 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800388
Ray Milkey8e5170e2014-04-02 12:09:55 -0700389 sw.flush();
Ray Milkey2476cac2014-04-08 11:03:21 -0700390 queue.logSentData(currentTime, size);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700391 }
392 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800393
Ray Milkey8e5170e2014-04-02 12:09:55 -0700394 private boolean queuesHasMessageToSend() {
395 for (SwitchQueue queue : assignedQueues.values()) {
396 if (queue.hasMessageToSend()) {
397 return true;
398 }
399 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700400
Ray Milkey8e5170e2014-04-02 12:09:55 -0700401 return false;
402 }
Naoki Shiota05334692014-03-18 16:06:36 -0700403
Ray Milkey8e5170e2014-04-02 12:09:55 -0700404 private void notifyMessagePushed() {
405 queuingLock.lock();
406 try {
407 messagePushed.signal();
408 } finally {
409 queuingLock.unlock();
410 }
411 }
412 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700413
Ray Milkey8e5170e2014-04-02 12:09:55 -0700414 /**
415 * Initialize object with one thread.
416 */
417 public FlowPusher() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700418 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700419 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800420
Ray Milkey8e5170e2014-04-02 12:09:55 -0700421 /**
422 * Initialize object with threads of given number.
423 *
Ray Milkey9526d6f2014-04-10 14:54:15 -0700424 * @param numberThreadValue Number of threads to handle messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700425 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700426 public FlowPusher(int numberThreadValue) {
427 if (numberThreadValue > 0) {
428 numberThread = numberThreadValue;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700429 } else {
Ray Milkey9526d6f2014-04-10 14:54:15 -0700430 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700431 }
432 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800433
Ray Milkey8e5170e2014-04-02 12:09:55 -0700434 /**
435 * Set parameters needed for sending messages.
436 *
Ray Milkey5df613b2014-04-15 10:50:56 -0700437 * @param floodlightContext FloodlightContext used for sending messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700438 * If null, FlowPusher uses default context.
439 * @param modContext FloodlightModuleContext used for acquiring
440 * ThreadPoolService and registering MessageListener.
Ray Milkey5df613b2014-04-15 10:50:56 -0700441 * @param basicFactory Factory object to create OFMessage objects.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700442 * @param damper Message damper used for sending messages.
443 * If null, FlowPusher creates its own damper object.
444 */
Ray Milkey5df613b2014-04-15 10:50:56 -0700445 public void init(FloodlightContext floodlightContext,
Ray Milkey8e5170e2014-04-02 12:09:55 -0700446 FloodlightModuleContext modContext,
Ray Milkey5df613b2014-04-15 10:50:56 -0700447 BasicFactory basicFactory,
Ray Milkey8e5170e2014-04-02 12:09:55 -0700448 OFMessageDamper damper) {
Ray Milkey5df613b2014-04-15 10:50:56 -0700449 context = floodlightContext;
450 factory = basicFactory;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700451 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
452 IFloodlightProviderService flservice
453 = modContext.getServiceImpl(IFloodlightProviderService.class);
454 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800455
Ray Milkey8e5170e2014-04-02 12:09:55 -0700456 if (damper != null) {
457 messageDamper = damper;
458 } else {
459 // use default values
460 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
461 EnumSet.of(OFType.FLOW_MOD),
462 OFMESSAGE_DAMPER_TIMEOUT);
463 }
464 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800465
Ray Milkey8e5170e2014-04-02 12:09:55 -0700466 /**
467 * Begin processing queue.
468 */
469 public void start() {
470 if (factory == null) {
471 log.error("FlowPusher not yet initialized.");
472 return;
473 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800474
Ray Milkey8e5170e2014-04-02 12:09:55 -0700475 threadMap = new HashMap<Long, FlowPusherThread>();
Ray Milkey2476cac2014-04-08 11:03:21 -0700476 for (long i = 0; i < numberThread; ++i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700477 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800478
Ray Milkey8e5170e2014-04-02 12:09:55 -0700479 threadMap.put(i, thread);
480 thread.start();
481 }
482 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800483
Ray Milkey8e5170e2014-04-02 12:09:55 -0700484 @Override
485 public boolean suspend(IOFSwitch sw) {
486 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700487
Ray Milkey8e5170e2014-04-02 12:09:55 -0700488 if (queue == null) {
489 // create queue in case suspend is called before first message addition
490 queue = createQueueImpl(sw);
491 }
492
493 synchronized (queue) {
494 if (queue.state == QueueState.READY) {
495 queue.state = QueueState.SUSPENDED;
496 return true;
497 }
498 return false;
499 }
500 }
501
502 @Override
503 public boolean resume(IOFSwitch sw) {
504 SwitchQueue queue = getQueue(sw);
505
506 if (queue == null) {
507 log.error("No queue is attached to DPID : {}", sw.getId());
508 return false;
509 }
510
511 synchronized (queue) {
512 if (queue.state == QueueState.SUSPENDED) {
513 queue.state = QueueState.READY;
514
515 // Free the latch if queue has any messages
516 FlowPusherThread thread = getProcessingThread(sw);
517 if (queue.hasMessageToSend()) {
518 thread.notifyMessagePushed();
519 }
520 return true;
521 }
522 return false;
523 }
524 }
525
526 @Override
527 public QueueState getState(IOFSwitch sw) {
528 SwitchQueue queue = getQueue(sw);
529
530 if (queue == null) {
531 return QueueState.UNKNOWN;
532 }
533
534 return queue.state;
535 }
536
537 /**
538 * Stop processing queue and exit thread.
539 */
540 public void stop() {
541 if (threadMap == null) {
542 return;
543 }
544
545 for (FlowPusherThread t : threadMap.values()) {
546 t.interrupt();
547 }
548 }
549
550 @Override
551 public void setRate(IOFSwitch sw, long rate) {
552 SwitchQueue queue = getQueue(sw);
553 if (queue == null) {
554 queue = createQueueImpl(sw);
555 }
556
557 if (rate > 0) {
558 log.debug("rate for {} is set to {}", sw.getId(), rate);
559 synchronized (queue) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700560 queue.maxRate = rate;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700561 }
562 }
563 }
564
565 @Override
Pavlin Radoslavov9fc535a2014-04-11 13:00:12 -0700566 @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
567 justification = "Future versions of createQueueImpl() might return null")
Ray Milkey8e5170e2014-04-02 12:09:55 -0700568 public boolean createQueue(IOFSwitch sw) {
569 SwitchQueue queue = createQueueImpl(sw);
570
571 return (queue != null);
572 }
573
574 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
575 SwitchQueue queue = getQueue(sw);
576 if (queue != null) {
577 return queue;
578 }
579
580 FlowPusherThread proc = getProcessingThread(sw);
581 queue = new SwitchQueue();
582 queue.state = QueueState.READY;
583 proc.assignedQueues.put(sw, queue);
584
585 return queue;
586 }
587
588 @Override
589 public boolean deleteQueue(IOFSwitch sw) {
590 return deleteQueue(sw, false);
591 }
592
593 @Override
594 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
595 FlowPusherThread proc = getProcessingThread(sw);
596
597 if (forceStop) {
598 SwitchQueue queue = proc.assignedQueues.remove(sw);
599 if (queue == null) {
600 return false;
601 }
602 return true;
603 } else {
604 SwitchQueue queue = getQueue(sw);
605 if (queue == null) {
606 return false;
607 }
608 synchronized (queue) {
609 queue.toBeDeleted = true;
610 }
611 return true;
612 }
613 }
614
615 @Override
616 public boolean add(IOFSwitch sw, OFMessage msg) {
617 return add(sw, msg, MsgPriority.NORMAL);
618 }
619
620 @Override
621 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
622 return addMessageImpl(sw, msg, priority);
623 }
624
625 @Override
626 public void pushFlowEntries(
627 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
628 pushFlowEntries(entries, MsgPriority.NORMAL);
629 }
630
631 @Override
632 public void pushFlowEntries(
633 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
634
635 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Sho SHIMIZU26d77892014-06-10 11:07:06 -0700636 add(entry.getFirst(), entry.getSecond(), priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700637 }
638 }
639
640 @Override
641 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
642 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
643 }
644
645 @Override
646 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
647 Collection<Pair<IOFSwitch, FlowEntry>> entries =
648 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
649
650 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
651 pushFlowEntries(entries, priority);
652 }
653
654 /**
Ray Milkeyddcd4922014-04-17 11:21:20 -0700655 * Fetch the match conditions.
656 * NOTE: The Flow matching conditions common for all Flow Entries are
657 * used ONLY if a Flow Entry does NOT have the corresponding matching
658 * condition set.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700659 *
Ray Milkeyddcd4922014-04-17 11:21:20 -0700660 * @param flowEntryMatch Flow entry to create a matcher for
661 * @return open flow matcher for the given values
Ray Milkey8e5170e2014-04-02 12:09:55 -0700662 */
Ray Milkeyddcd4922014-04-17 11:21:20 -0700663 private OFMatch computeMatch(FlowEntryMatch flowEntryMatch) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700664 OFMatch match = new OFMatch();
665 match.setWildcards(OFMatch.OFPFW_ALL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700666
667 // Match the Incoming Port
Yuta HIGUCHIfb564502014-06-16 21:29:00 -0700668 PortNumber matchInPort = flowEntryMatch.inPort();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700669 if (matchInPort != null) {
Yuta HIGUCHI9da3a6e2014-06-10 22:11:58 -0700670 match.setInputPort(matchInPort.shortValue());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700671 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
672 }
673
674 // Match the Source MAC address
675 MACAddress matchSrcMac = flowEntryMatch.srcMac();
676 if (matchSrcMac != null) {
677 match.setDataLayerSource(matchSrcMac.toString());
678 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
679 }
680
681 // Match the Destination MAC address
682 MACAddress matchDstMac = flowEntryMatch.dstMac();
683 if (matchDstMac != null) {
684 match.setDataLayerDestination(matchDstMac.toString());
685 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
686 }
687
688 // Match the Ethernet Frame Type
689 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
690 if (matchEthernetFrameType != null) {
691 match.setDataLayerType(matchEthernetFrameType);
692 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
693 }
694
695 // Match the VLAN ID
696 Short matchVlanId = flowEntryMatch.vlanId();
697 if (matchVlanId != null) {
698 match.setDataLayerVirtualLan(matchVlanId);
699 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
700 }
701
702 // Match the VLAN priority
703 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
704 if (matchVlanPriority != null) {
705 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
706 match.setWildcards(match.getWildcards()
707 & ~OFMatch.OFPFW_DL_VLAN_PCP);
708 }
709
710 // Match the Source IPv4 Network prefix
711 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
712 if (matchSrcIPv4Net != null) {
713 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
714 }
715
716 // Natch the Destination IPv4 Network prefix
717 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
718 if (matchDstIPv4Net != null) {
719 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
720 }
721
722 // Match the IP protocol
723 Byte matchIpProto = flowEntryMatch.ipProto();
724 if (matchIpProto != null) {
725 match.setNetworkProtocol(matchIpProto);
726 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
727 }
728
729 // Match the IP ToS (DSCP field, 6 bits)
730 Byte matchIpToS = flowEntryMatch.ipToS();
731 if (matchIpToS != null) {
732 match.setNetworkTypeOfService(matchIpToS);
733 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
734 }
735
736 // Match the Source TCP/UDP port
737 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
738 if (matchSrcTcpUdpPort != null) {
739 match.setTransportSource(matchSrcTcpUdpPort);
740 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
741 }
742
743 // Match the Destination TCP/UDP port
744 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
745 if (matchDstTcpUdpPort != null) {
746 match.setTransportDestination(matchDstTcpUdpPort);
747 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
748 }
749
Ray Milkeyddcd4922014-04-17 11:21:20 -0700750 return match;
751 }
752
753
754 /**
755 * Wrapper object to hold a port number. Used to pass around output ports.
756 */
757 private static class OutputPort {
758 private Short portNumber;
759 }
760
761 /**
762 * Process a flow action entry, putting the resulting flow
763 * actions into a list. Will also set the actionOutputPort
764 * if one is encountered while processing an action.
765 *
766 * @param action Flow Entry Action to process
767 * @param openFlowActions actions to perform get added to this list
768 * @param actionOutputPort this will get set if an action output
769 * port is found
770 */
771 private void processAction(final FlowEntryAction action,
772 final List<OFAction> openFlowActions,
773 final OutputPort actionOutputPort) {
774 ActionOutput actionOutput = action.actionOutput();
775 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
776 ActionSetVlanPriority actionSetVlanPriority = action
777 .actionSetVlanPriority();
778 ActionStripVlan actionStripVlan = action.actionStripVlan();
779 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
780 .actionSetEthernetSrcAddr();
781 ActionSetEthernetAddr actionSetEthernetDstAddr = action
782 .actionSetEthernetDstAddr();
783 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
784 .actionSetIPv4SrcAddr();
785 ActionSetIPv4Addr actionSetIPv4DstAddr = action
786 .actionSetIPv4DstAddr();
787 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
788 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
789 .actionSetTcpUdpSrcPort();
790 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
791 .actionSetTcpUdpDstPort();
792 ActionEnqueue actionEnqueue = action.actionEnqueue();
793
794 if (actionOutput != null) {
Yuta HIGUCHI9da3a6e2014-06-10 22:11:58 -0700795 actionOutputPort.portNumber = actionOutput.port().shortValue();
Ray Milkeyddcd4922014-04-17 11:21:20 -0700796 // XXX: The max length is hard-coded for now
797 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
Yuta HIGUCHI9da3a6e2014-06-10 22:11:58 -0700798 .shortValue(), (short) 0xffff);
Ray Milkeyddcd4922014-04-17 11:21:20 -0700799 openFlowActions.add(ofa);
800 }
801
802 if (actionSetVlanId != null) {
803 OFActionVirtualLanIdentifier ofa =
804 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
805 openFlowActions.add(ofa);
806 }
807
808 if (actionSetVlanPriority != null) {
809 OFActionVirtualLanPriorityCodePoint ofa =
810 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
811 openFlowActions.add(ofa);
812 }
813
814 if (actionStripVlan != null) {
815 if (actionStripVlan.stripVlan()) {
816 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
817 openFlowActions.add(ofa);
818 }
819 }
820
821 if (actionSetEthernetSrcAddr != null) {
822 OFActionDataLayerSource ofa =
823 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
824 openFlowActions.add(ofa);
825 }
826
827 if (actionSetEthernetDstAddr != null) {
828 OFActionDataLayerDestination ofa =
829 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
830 openFlowActions.add(ofa);
831 }
832
833 if (actionSetIPv4SrcAddr != null) {
834 OFActionNetworkLayerSource ofa =
835 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
836 openFlowActions.add(ofa);
837 }
838
839 if (actionSetIPv4DstAddr != null) {
840 OFActionNetworkLayerDestination ofa =
841 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
842 openFlowActions.add(ofa);
843 }
844
845 if (actionSetIpToS != null) {
846 OFActionNetworkTypeOfService ofa =
847 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
848 openFlowActions.add(ofa);
849 }
850
851 if (actionSetTcpUdpSrcPort != null) {
852 OFActionTransportLayerSource ofa =
853 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
854 openFlowActions.add(ofa);
855 }
856
857 if (actionSetTcpUdpDstPort != null) {
858 OFActionTransportLayerDestination ofa =
859 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
860 openFlowActions.add(ofa);
861 }
862
863 if (actionEnqueue != null) {
864 OFActionEnqueue ofa =
Yuta HIGUCHI9da3a6e2014-06-10 22:11:58 -0700865 new OFActionEnqueue(actionEnqueue.port().shortValue(), actionEnqueue.queueId());
Ray Milkeyddcd4922014-04-17 11:21:20 -0700866 openFlowActions.add(ofa);
867 }
868 }
869
870
871 /**
872 * Create a message from FlowEntry and add it to the queue of the switch.
873 *
874 * @param sw Switch to which message is pushed.
875 * @param flowEntry FlowEntry object used for creating message.
876 * @return true if message is successfully added to a queue.
877 */
878 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
879 //
880 // Create the OpenFlow Flow Modification Entry to push
881 //
882 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
883 long cookie = flowEntry.flowEntryId().value();
884
885 short flowModCommand = OFFlowMod.OFPFC_ADD;
886 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
887 flowModCommand = OFFlowMod.OFPFC_ADD;
888 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
889 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
890 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
891 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
892 } else {
893 // Unknown user state. Ignore the entry
894 log.debug(
895 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
896 flowEntry.flowEntryId(),
897 flowEntry.flowEntryUserState());
898 return false;
899 }
900
901 final FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
902 final OFMatch match = computeMatch(flowEntryMatch);
903
Yuta HIGUCHIfb564502014-06-16 21:29:00 -0700904 final PortNumber matchInPort = flowEntryMatch.inPort();
Ray Milkeyddcd4922014-04-17 11:21:20 -0700905 final MACAddress matchSrcMac = flowEntryMatch.srcMac();
906 final MACAddress matchDstMac = flowEntryMatch.dstMac();
907
Ray Milkey8e5170e2014-04-02 12:09:55 -0700908 //
909 // Fetch the actions
910 //
Ray Milkeyddcd4922014-04-17 11:21:20 -0700911 final List<OFAction> openFlowActions = new ArrayList<OFAction>();
912 final FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700913 //
Ray Milkeyddcd4922014-04-17 11:21:20 -0700914 final OutputPort actionOutputPort = new OutputPort();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700915 for (FlowEntryAction action : flowEntryActions.actions()) {
Ray Milkeyddcd4922014-04-17 11:21:20 -0700916 processAction(action, openFlowActions, actionOutputPort);
917 }
918 int actionsLen = 0;
919 for (OFAction ofa : openFlowActions) {
920 actionsLen += ofa.getLength();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700921 }
922
923 fm.setIdleTimeout((short) flowEntry.idleTimeout())
924 .setHardTimeout((short) flowEntry.hardTimeout())
925 .setPriority((short) flowEntry.priority())
926 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
927 .setCommand(flowModCommand).setMatch(match)
928 .setActions(openFlowActions)
929 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
930 fm.setOutPort(OFPort.OFPP_NONE.getValue());
931 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
932 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
Ray Milkeyddcd4922014-04-17 11:21:20 -0700933 if (actionOutputPort.portNumber != null) {
934 fm.setOutPort(actionOutputPort.portNumber);
Ray Milkeyb29e6262014-04-09 16:02:14 -0700935 }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700936 }
937
938 //
939 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
940 // permanent.
941 //
942 if ((flowEntry.idleTimeout() != 0) ||
943 (flowEntry.hardTimeout() != 0)) {
944 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
945 }
946
947 if (log.isTraceEnabled()) {
Jonathan Hartc00f5c22014-06-10 15:14:40 -0700948 log.trace("Installing flow entry {} into switch DPID: {} " +
949 "flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
Ray Milkey8e5170e2014-04-02 12:09:55 -0700950 , flowEntry.flowEntryUserState()
951 , sw.getStringId()
952 , flowEntry.flowEntryId()
953 , matchSrcMac
954 , matchDstMac
955 , matchInPort
956 , actionOutputPort
957 );
958 }
959
960 return addMessageImpl(sw, fm, priority);
961 }
962
963 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700964 * Add message to queue.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700965 *
966 * @param sw
967 * @param msg
Sho SHIMIZUa1199fa2014-06-10 18:11:12 -0700968 * @param priority
Jonathan Hart99ff20a2014-06-15 16:53:00 -0700969 * @return true if the message was added successfully, otherwise false
Ray Milkey8e5170e2014-04-02 12:09:55 -0700970 */
971 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
972 FlowPusherThread thread = getProcessingThread(sw);
973
974 SwitchQueue queue = getQueue(sw);
975
976 // create queue at first addition of message
977 if (queue == null) {
978 queue = createQueueImpl(sw);
979 }
980
981 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
982
983 synchronized (queue) {
984 queue.add(entry, priority);
985 if (log.isTraceEnabled()) {
986 log.trace("Message is pushed : {}", entry.getOFMessage());
987 }
988 }
989
990 thread.notifyMessagePushed();
991
992 return true;
993 }
994
995 @Override
996 public OFBarrierReply barrier(IOFSwitch sw) {
997 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
998 if (future == null) {
999 return null;
1000 }
1001
1002 try {
1003 return future.get();
1004 } catch (InterruptedException e) {
Yuta HIGUCHIe933a282014-07-13 21:02:29 -07001005 log.error("InterruptedException:", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -07001006 return null;
1007 } catch (ExecutionException e) {
Yuta HIGUCHIe933a282014-07-13 21:02:29 -07001008 log.error("ExecutionException:", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -07001009 return null;
1010 }
1011 }
1012
1013 @Override
1014 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1015 // TODO creation of message and future should be moved to OFSwitchImpl
1016
1017 if (sw == null) {
1018 return null;
1019 }
1020
1021 OFBarrierRequest msg = createBarrierRequest(sw);
1022
1023 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
1024 barrierFutures.put(BarrierInfo.create(sw, msg), future);
1025
1026 addMessageImpl(sw, msg, MsgPriority.NORMAL);
1027
1028 return future;
1029 }
1030
1031 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
1032 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
1033 msg.setXid(sw.getNextTransactionId());
1034
1035 return msg;
1036 }
1037
1038 /**
1039 * Get a queue attached to a switch.
1040 *
1041 * @param sw Switch object
1042 * @return Queue object
1043 */
1044 protected SwitchQueue getQueue(IOFSwitch sw) {
1045 if (sw == null) {
1046 return null;
1047 }
1048
1049 FlowPusherThread th = getProcessingThread(sw);
1050 if (th == null) {
1051 return null;
1052 }
1053
1054 return th.assignedQueues.get(sw);
1055 }
1056
1057 /**
1058 * Get a hash value correspondent to a switch.
1059 *
1060 * @param sw Switch object
1061 * @return Hash value
1062 */
1063 protected long getHash(IOFSwitch sw) {
1064 // This code assumes DPID is sequentially assigned.
1065 // TODO consider equalization algorithm
Ray Milkey2476cac2014-04-08 11:03:21 -07001066 return sw.getId() % numberThread;
Ray Milkey8e5170e2014-04-02 12:09:55 -07001067 }
1068
1069 /**
1070 * Get a Thread object which processes the queue attached to a switch.
1071 *
1072 * @param sw Switch object
1073 * @return Thread object
1074 */
1075 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
1076 long hash = getHash(sw);
1077
1078 return threadMap.get(hash);
1079 }
1080
1081 @Override
1082 public String getName() {
1083 return "flowpusher";
1084 }
1085
1086 @Override
1087 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1088 return false;
1089 }
1090
1091 @Override
1092 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1093 return false;
1094 }
1095
1096 @Override
1097 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
1098 if (log.isTraceEnabled()) {
1099 log.trace("Received BARRIER_REPLY from : {}", sw.getId());
1100 }
1101
Pavlin Radoslavov0b88a262014-04-10 15:43:27 -07001102 if ((msg.getType() != OFType.BARRIER_REPLY) ||
1103 !(msg instanceof OFBarrierReply)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -07001104 log.error("Unexpected reply message : {}", msg.getType());
1105 return Command.CONTINUE;
1106 }
1107
1108 OFBarrierReply reply = (OFBarrierReply) msg;
1109 BarrierInfo info = BarrierInfo.create(sw, reply);
1110
1111 // Deliver future if exists
1112 OFBarrierReplyFuture future = barrierFutures.get(info);
1113 if (future != null) {
1114 future.deliverFuture(sw, msg);
1115 barrierFutures.remove(info);
1116 }
1117
1118 return Command.CONTINUE;
1119 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001120}