blob: 3340ed090ae60b0ca1e595276d97c0aa5230e46b [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08008import java.util.HashSet;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08009import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070010import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080011import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080012import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080013import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070014
Naoki Shiota7d0cf272013-11-05 10:18:12 -080015import org.openflow.protocol.*;
16import org.openflow.protocol.action.*;
17import org.openflow.protocol.factory.BasicFactory;
18import org.slf4j.Logger;
19import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
21import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080022import net.floodlightcontroller.core.IFloodlightProviderService;
23import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070024import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080025import net.floodlightcontroller.core.internal.OFMessageFuture;
26import net.floodlightcontroller.core.module.FloodlightModuleContext;
27import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080028import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080029import net.floodlightcontroller.util.OFMessageDamper;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080030import net.onrc.onos.ofcontroller.util.FlowEntryAction;
31import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080032import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080033import net.onrc.onos.ofcontroller.util.FlowEntryActions;
34import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080035import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
36import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080037import net.onrc.onos.ofcontroller.util.IPv4Net;
38import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070039
40/**
Naoki Shiotab485d412013-11-26 12:04:19 -080041 * FlowPusher is a implementation of FlowPusherService.
42 * FlowPusher assigns one message queue instance for each one switch.
43 * Number of message processing threads is configurable by constructor, and
44 * one thread can handle multiple message queues. Each queue will be assigned to
45 * a thread according to hash function defined by getHash().
46 * Each processing thread reads messages from queues and sends it to switches
47 * in round-robin. Processing thread also calculates rate of sending to suppress
48 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070049 * @author Naoki Shiota
50 *
51 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080052public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080053 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
54
55 // NOTE: Below are moved from FlowManager.
56 // TODO: Values copied from elsewhere (class LearningSwitch).
57 // The local copy should go away!
58 //
59 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
60 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080061
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080062 // Number of messages sent to switch at once
63 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080064
65 public static final short PRIORITY_DEFAULT = 100;
66 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
67 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
68
69 public enum QueueState {
70 READY,
71 SUSPENDED,
72 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070073
Naoki Shiotac1601d32013-11-20 10:47:34 -080074 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080075 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080076 * This consists of queue itself and variables used for limiting sending rate.
77 * @author Naoki Shiota
78 *
79 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080080 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080081 private class SwitchQueue extends ArrayDeque<OFMessage> {
82 QueueState state;
83
Naoki Shiotae3199732013-11-25 16:14:43 -080084 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080085 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070086 long last_sent_time = 0;
87 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080088
Naoki Shiotae3199732013-11-25 16:14:43 -080089 // "To be deleted" flag
90 boolean toBeDeleted = false;
91
Naoki Shiota7d0cf272013-11-05 10:18:12 -080092 /**
93 * Check if sending rate is within the rate
94 * @param current Current time
95 * @return true if within the rate
96 */
97 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -080098 if (max_rate == 0) {
99 // no limitation
100 return true;
101 }
102
Naoki Shiota81dbe302013-11-21 15:35:38 -0800103 if (current == last_sent_time) {
104 return false;
105 }
106
Naoki Shiotac1601d32013-11-20 10:47:34 -0800107 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800108 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800109 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800110 }
111
Naoki Shiota81dbe302013-11-21 15:35:38 -0800112 /**
113 * Log time and size of last sent data.
114 * @param current Time to be sent.
115 * @param size Size of sent data (in bytes).
116 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800117 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800118 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800119 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800120 }
121
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700122 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800123
Naoki Shiotac1601d32013-11-20 10:47:34 -0800124 private OFMessageDamper messageDamper = null;
125 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700126
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800127 private FloodlightContext context = null;
128 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800129
130 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800131 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800132 // Map of Future objects versus dpid and transaction ID.
Naoki Shiotac1601d32013-11-20 10:47:34 -0800133 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
134 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800135
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800136 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800137
Naoki Shiota8739faa2013-11-18 17:00:25 -0800138 /**
139 * Main thread that reads messages from queues and sends them to switches.
140 * @author Naoki Shiota
141 *
142 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800143 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800144 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotab485d412013-11-26 12:04:19 -0800145 = new HashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800146
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800147 // reusable latch used for waiting for arrival of message
Naoki Shiota81dbe302013-11-21 15:35:38 -0800148 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800149
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700150 @Override
151 public void run() {
152 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800153 try {
154 // wait for message pushed to queue
155 mutex.acquire();
156 } catch (InterruptedException e) {
157 e.printStackTrace();
158 log.debug("FlowPusherThread is interrupted");
159 return;
160 }
161
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800162 // for safety of concurrent access, copy all key objects
163 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800164 synchronized (queues) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800165 for (IOFSwitch sw : queues.keySet()) {
166 keys.add(sw);
167 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800168 }
169
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800170 for (IOFSwitch sw : keys) {
171 SwitchQueue queue = queues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800172
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700173 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800174 if (sw == null || queue == null ||
175 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700176 continue;
177 }
178
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800179 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800180 processQueue(sw, queue, MAX_MESSAGE_SEND);
181 if (queue.isEmpty()) {
182 // remove queue if flagged to be.
183 if (queue.toBeDeleted) {
184 synchronized (queues) {
185 queues.remove(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800186 }
187 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800188 } else {
189 // if some messages remains in queue, latch down
190 if (mutex.availablePermits() == 0) {
191 mutex.release();
Naoki Shiota81dbe302013-11-21 15:35:38 -0800192 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700193 }
194 }
195 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700196 }
197 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800198
199 /**
200 * Read messages from queue and send them to the switch.
201 * If number of messages excess the limit, stop sending messages.
202 * @param sw Switch to which messages will be sent.
203 * @param queue Queue of messages.
204 * @param max_msg Limitation of number of messages to be sent. If set to 0,
205 * all messages in queue will be sent.
206 */
207 private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
208 // check sending rate and determine it to be sent or not
209 long current_time = System.currentTimeMillis();
210 long size = 0;
211
212 if (queue.isSendable(current_time)) {
213 int i = 0;
214 while (! queue.isEmpty()) {
215 // Number of messages excess the limit
216 if (0 < max_msg && max_msg <= i) {
217 break;
218 }
219 ++i;
220
221 OFMessage msg = queue.poll();
222 try {
223 messageDamper.write(sw, msg, context);
224 log.debug("Pusher sends message : {}", msg);
225 size += msg.getLength();
226 } catch (IOException e) {
227 e.printStackTrace();
228 log.error("Exception in sending message ({}) : {}", msg, e);
229 }
230 }
231 sw.flush();
232 queue.logSentData(current_time, size);
233 }
234 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700235 }
236
Naoki Shiotac1601d32013-11-20 10:47:34 -0800237 /**
238 * Initialize object with one thread.
239 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800240 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800241 }
242
Naoki Shiotac1601d32013-11-20 10:47:34 -0800243 /**
244 * Initialize object with threads of given number.
245 * @param number_thread Number of threads to handle messages.
246 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800247 public FlowPusher(int number_thread) {
248 this.number_thread = number_thread;
249 }
250
Naoki Shiotac1601d32013-11-20 10:47:34 -0800251 /**
252 * Set parameters needed for sending messages.
253 * @param context FloodlightContext used for sending messages.
254 * If null, FlowPusher uses default context.
255 * @param modContext FloodlightModuleContext used for acquiring
256 * ThreadPoolService and registering MessageListener.
257 * @param factory Factory object to create OFMessage objects.
258 * @param damper Message damper used for sending messages.
259 * If null, FlowPusher creates its own damper object.
260 */
261 public void init(FloodlightContext context,
262 FloodlightModuleContext modContext,
263 BasicFactory factory,
264 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700265 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800266 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800267 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
268 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
269 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800270
271 if (damper != null) {
272 messageDamper = damper;
273 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800274 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800275 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
276 EnumSet.of(OFType.FLOW_MOD),
277 OFMESSAGE_DAMPER_TIMEOUT);
278 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700279 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800280
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800281 /**
282 * Begin processing queue.
283 */
284 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800285 if (factory == null) {
286 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800287 return;
288 }
289
Naoki Shiota81dbe302013-11-21 15:35:38 -0800290 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800291 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800292 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800293
Naoki Shiota81dbe302013-11-21 15:35:38 -0800294 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800295 thread.start();
296 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700297 }
298
Brian O'Connor8c166a72013-11-14 18:41:48 -0800299 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800300 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800301 SwitchQueue queue = getQueue(sw);
302
303 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800304 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800305 }
306
307 synchronized (queue) {
308 if (queue.state == QueueState.READY) {
309 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800310 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800311 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800312 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800313 }
314 }
315
Brian O'Connor8c166a72013-11-14 18:41:48 -0800316 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800317 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800318 SwitchQueue queue = getQueue(sw);
319
320 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800321 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800322 }
323
324 synchronized (queue) {
325 if (queue.state == QueueState.SUSPENDED) {
326 queue.state = QueueState.READY;
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800327
328 // Latch down if queue is not empty
329 FlowPusherThread thread = getProcess(sw);
330 if (! queue.isEmpty() &&
331 thread.mutex.availablePermits() == 0) {
332 thread.mutex.release();
333 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800334 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800335 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800336 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800337 }
338 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800339
Brian O'Connor8c166a72013-11-14 18:41:48 -0800340 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800341 public boolean isSuspended(IOFSwitch sw) {
342 SwitchQueue queue = getQueue(sw);
343
344 if (queue == null) {
345 // TODO Is true suitable for this case?
346 return true;
347 }
348
349 return (queue.state == QueueState.SUSPENDED);
350 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800351
352 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800353 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800354 */
355 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800356 if (threadMap == null) {
357 return;
358 }
359
Naoki Shiota81dbe302013-11-21 15:35:38 -0800360 for (FlowPusherThread t : threadMap.values()) {
361 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800362 }
363 }
364
Naoki Shiotae3199732013-11-25 16:14:43 -0800365 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800366 public void setRate(IOFSwitch sw, long rate) {
367 SwitchQueue queue = getQueue(sw);
368 if (queue == null) {
369 return;
370 }
371
372 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800373 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800374 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700375 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700376 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800377
378 @Override
379 public boolean createQueue(IOFSwitch sw) {
380 SwitchQueue queue = getQueue(sw);
381 if (queue != null) {
382 return false;
383 }
384
385 FlowPusherThread proc = getProcess(sw);
386 queue = new SwitchQueue();
387 queue.state = QueueState.READY;
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800388 synchronized (proc.queues) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800389 proc.queues.put(sw, queue);
390 }
391
392 return true;
393 }
394
395 @Override
396 public boolean deleteQueue(IOFSwitch sw) {
397 return deleteQueue(sw, false);
398 }
399
400 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800401 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800402 FlowPusherThread proc = getProcess(sw);
403
Naoki Shiotab485d412013-11-26 12:04:19 -0800404 if (forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800405 synchronized (proc.queues) {
406 SwitchQueue queue = proc.queues.remove(sw);
407 if (queue == null) {
408 return false;
409 }
410 }
411 return true;
412 } else {
413 SwitchQueue queue = getQueue(sw);
414 if (queue == null) {
415 return false;
416 }
417 synchronized (queue) {
418 queue.toBeDeleted = true;
419 }
420 return true;
421 }
422 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700423
Brian O'Connor8c166a72013-11-14 18:41:48 -0800424 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800425 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800426 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800427 SwitchQueue queue = proc.queues.get(sw);
428
Naoki Shiotab485d412013-11-26 12:04:19 -0800429 // create queue at first addition of message
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800430 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800431 createQueue(sw);
432 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800433 }
434
435 synchronized (queue) {
436 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800437 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800438 }
439
Naoki Shiota81dbe302013-11-21 15:35:38 -0800440 if (proc.mutex.availablePermits() == 0) {
441 proc.mutex.release();
442 }
443
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800444 return true;
445 }
446
Brian O'Connor8c166a72013-11-14 18:41:48 -0800447 @Override
Pavlin Radoslavov6bfaea62013-12-03 14:55:57 -0800448 public boolean add(IOFSwitch sw, FlowEntry flowEntry) {
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800449 //
450 // Create the OpenFlow Flow Modification Entry to push
451 //
452 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
453 long cookie = flowEntry.flowEntryId().value();
454
455 short flowModCommand = OFFlowMod.OFPFC_ADD;
456 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
457 flowModCommand = OFFlowMod.OFPFC_ADD;
458 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
459 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
460 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
461 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
462 } else {
463 // Unknown user state. Ignore the entry
464 log.debug(
465 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
466 flowEntry.flowEntryId().toString(),
467 flowEntry.flowEntryUserState());
468 return false;
469 }
470
471 //
472 // Fetch the match conditions.
473 //
474 // NOTE: The Flow matching conditions common for all Flow Entries are
475 // used ONLY if a Flow Entry does NOT have the corresponding matching
476 // condition set.
477 //
478 OFMatch match = new OFMatch();
479 match.setWildcards(OFMatch.OFPFW_ALL);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800480 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
481
482 // Match the Incoming Port
483 Port matchInPort = flowEntryMatch.inPort();
484 if (matchInPort != null) {
485 match.setInputPort(matchInPort.value());
486 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
487 }
488
489 // Match the Source MAC address
490 MACAddress matchSrcMac = flowEntryMatch.srcMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800491 if (matchSrcMac != null) {
492 match.setDataLayerSource(matchSrcMac.toString());
493 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
494 }
495
496 // Match the Destination MAC address
497 MACAddress matchDstMac = flowEntryMatch.dstMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800498 if (matchDstMac != null) {
499 match.setDataLayerDestination(matchDstMac.toString());
500 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
501 }
502
503 // Match the Ethernet Frame Type
504 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800505 if (matchEthernetFrameType != null) {
506 match.setDataLayerType(matchEthernetFrameType);
507 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
508 }
509
510 // Match the VLAN ID
511 Short matchVlanId = flowEntryMatch.vlanId();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800512 if (matchVlanId != null) {
513 match.setDataLayerVirtualLan(matchVlanId);
514 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
515 }
516
517 // Match the VLAN priority
518 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800519 if (matchVlanPriority != null) {
520 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
521 match.setWildcards(match.getWildcards()
522 & ~OFMatch.OFPFW_DL_VLAN_PCP);
523 }
524
525 // Match the Source IPv4 Network prefix
526 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800527 if (matchSrcIPv4Net != null) {
528 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
529 }
530
531 // Natch the Destination IPv4 Network prefix
532 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800533 if (matchDstIPv4Net != null) {
534 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
535 }
536
537 // Match the IP protocol
538 Byte matchIpProto = flowEntryMatch.ipProto();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800539 if (matchIpProto != null) {
540 match.setNetworkProtocol(matchIpProto);
541 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
542 }
543
544 // Match the IP ToS (DSCP field, 6 bits)
545 Byte matchIpToS = flowEntryMatch.ipToS();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800546 if (matchIpToS != null) {
547 match.setNetworkTypeOfService(matchIpToS);
548 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
549 }
550
551 // Match the Source TCP/UDP port
552 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800553 if (matchSrcTcpUdpPort != null) {
554 match.setTransportSource(matchSrcTcpUdpPort);
555 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
556 }
557
558 // Match the Destination TCP/UDP port
559 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800560 if (matchDstTcpUdpPort != null) {
561 match.setTransportDestination(matchDstTcpUdpPort);
562 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
563 }
564
565 //
566 // Fetch the actions
567 //
568 Short actionOutputPort = null;
569 List<OFAction> openFlowActions = new ArrayList<OFAction>();
570 int actionsLen = 0;
571 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
572 //
573 for (FlowEntryAction action : flowEntryActions.actions()) {
574 ActionOutput actionOutput = action.actionOutput();
575 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
576 ActionSetVlanPriority actionSetVlanPriority = action
577 .actionSetVlanPriority();
578 ActionStripVlan actionStripVlan = action.actionStripVlan();
579 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
580 .actionSetEthernetSrcAddr();
581 ActionSetEthernetAddr actionSetEthernetDstAddr = action
582 .actionSetEthernetDstAddr();
583 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
584 .actionSetIPv4SrcAddr();
585 ActionSetIPv4Addr actionSetIPv4DstAddr = action
586 .actionSetIPv4DstAddr();
587 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
588 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
589 .actionSetTcpUdpSrcPort();
590 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
591 .actionSetTcpUdpDstPort();
592 ActionEnqueue actionEnqueue = action.actionEnqueue();
593
594 if (actionOutput != null) {
595 actionOutputPort = actionOutput.port().value();
596 // XXX: The max length is hard-coded for now
597 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
598 .value(), (short) 0xffff);
599 openFlowActions.add(ofa);
600 actionsLen += ofa.getLength();
601 }
602
603 if (actionSetVlanId != null) {
604 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
605 actionSetVlanId.vlanId());
606 openFlowActions.add(ofa);
607 actionsLen += ofa.getLength();
608 }
609
610 if (actionSetVlanPriority != null) {
611 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
612 actionSetVlanPriority.vlanPriority());
613 openFlowActions.add(ofa);
614 actionsLen += ofa.getLength();
615 }
616
617 if (actionStripVlan != null) {
618 if (actionStripVlan.stripVlan() == true) {
619 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
620 openFlowActions.add(ofa);
621 actionsLen += ofa.getLength();
622 }
623 }
624
625 if (actionSetEthernetSrcAddr != null) {
626 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
627 actionSetEthernetSrcAddr.addr().toBytes());
628 openFlowActions.add(ofa);
629 actionsLen += ofa.getLength();
630 }
631
632 if (actionSetEthernetDstAddr != null) {
633 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
634 actionSetEthernetDstAddr.addr().toBytes());
635 openFlowActions.add(ofa);
636 actionsLen += ofa.getLength();
637 }
638
639 if (actionSetIPv4SrcAddr != null) {
640 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
641 actionSetIPv4SrcAddr.addr().value());
642 openFlowActions.add(ofa);
643 actionsLen += ofa.getLength();
644 }
645
646 if (actionSetIPv4DstAddr != null) {
647 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
648 actionSetIPv4DstAddr.addr().value());
649 openFlowActions.add(ofa);
650 actionsLen += ofa.getLength();
651 }
652
653 if (actionSetIpToS != null) {
654 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
655 actionSetIpToS.ipToS());
656 openFlowActions.add(ofa);
657 actionsLen += ofa.getLength();
658 }
659
660 if (actionSetTcpUdpSrcPort != null) {
661 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
662 actionSetTcpUdpSrcPort.port());
663 openFlowActions.add(ofa);
664 actionsLen += ofa.getLength();
665 }
666
667 if (actionSetTcpUdpDstPort != null) {
668 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
669 actionSetTcpUdpDstPort.port());
670 openFlowActions.add(ofa);
671 actionsLen += ofa.getLength();
672 }
673
674 if (actionEnqueue != null) {
675 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
676 .value(), actionEnqueue.queueId());
677 openFlowActions.add(ofa);
678 actionsLen += ofa.getLength();
679 }
680 }
681
682 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
683 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
684 .setPriority(PRIORITY_DEFAULT)
685 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
686 .setCommand(flowModCommand).setMatch(match)
687 .setActions(openFlowActions)
688 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
689 fm.setOutPort(OFPort.OFPP_NONE.getValue());
690 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
691 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
692 if (actionOutputPort != null)
693 fm.setOutPort(actionOutputPort);
694 }
695
696 //
697 // TODO: Set the following flag
698 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
699 // See method ForwardingBase::pushRoute()
700 //
701
702 //
703 // Write the message to the switch
704 //
Pavlin Radoslavovf0678902013-12-03 15:06:56 -0800705 log.debug("Installing flow entry "
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800706 + flowEntry.flowEntryUserState() + " into switch DPID: "
707 + sw.getStringId() + " flowEntryId: "
708 + flowEntry.flowEntryId().toString() + " srcMac: "
709 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
710 + matchInPort + " outPort: " + actionOutputPort);
711
712 //
713 // TODO: We should use the OpenFlow Barrier mechanism
714 // to check for errors, and update the SwitchState
715 // for a flow entry after the Barrier message is
716 // is received.
717 //
718 // TODO: The FlowEntry Object in Titan should be set
719 // to FE_SWITCH_UPDATED.
720 //
721
722 return add(sw,fm);
723 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800724
725 @Override
726 public OFBarrierReply barrier(IOFSwitch sw) {
727 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
728 if (future == null) {
729 return null;
730 }
731
732 try {
733 return future.get();
734 } catch (InterruptedException e) {
735 e.printStackTrace();
736 log.error("InterruptedException: {}", e);
737 return null;
738 } catch (ExecutionException e) {
739 e.printStackTrace();
740 log.error("ExecutionException: {}", e);
741 return null;
742 }
743 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800744
Naoki Shiotac1601d32013-11-20 10:47:34 -0800745 @Override
746 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
747 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -0800748
749 if (sw == null) {
750 return null;
751 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800752
Naoki Shiota81dbe302013-11-21 15:35:38 -0800753 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800754 msg.setXid(sw.getNextTransactionId());
Naoki Shiotac1601d32013-11-20 10:47:34 -0800755
Naoki Shiotac1601d32013-11-20 10:47:34 -0800756 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiotac1601d32013-11-20 10:47:34 -0800757 synchronized (barrierFutures) {
758 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
759 if (map == null) {
760 map = new HashMap<Integer,OFBarrierReplyFuture>();
761 barrierFutures.put(sw.getId(), map);
762 }
763 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800764 }
765
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800766 add(sw, msg);
767
Naoki Shiotac1601d32013-11-20 10:47:34 -0800768 return future;
769 }
770
Naoki Shiotae3199732013-11-25 16:14:43 -0800771 /**
772 * Get a queue attached to a switch.
773 * @param sw Switch object
774 * @return Queue object
775 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800776 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800777 if (sw == null) {
778 return null;
779 }
780
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800781 return getProcess(sw).queues.get(sw);
782 }
783
Naoki Shiotae3199732013-11-25 16:14:43 -0800784 /**
785 * Get a hash value correspondent to a switch.
786 * @param sw Switch object
787 * @return Hash value
788 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800789 protected long getHash(IOFSwitch sw) {
790 // This code assumes DPID is sequentially assigned.
791 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800792 return sw.getId() % number_thread;
793 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800794
795 /**
796 * Get a Thread object which processes the queue attached to a switch.
797 * @param sw Switch object
798 * @return Thread object
799 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800800 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800801 long hash = getHash(sw);
802
803 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800804 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800805
806 @Override
807 public String getName() {
808 return "flowpusher";
809 }
810
811 @Override
812 public boolean isCallbackOrderingPrereq(OFType type, String name) {
813 return false;
814 }
815
816 @Override
817 public boolean isCallbackOrderingPostreq(OFType type, String name) {
818 return false;
819 }
820
821 @Override
822 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -0800823 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
824 if (map == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800825 log.debug("null map for {} : {}", sw.getId(), barrierFutures);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800826 return Command.CONTINUE;
827 }
828
829 OFBarrierReplyFuture future = map.get(msg.getXid());
830 if (future == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800831 log.debug("null future for {} : {}", msg.getXid(), map);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800832 return Command.CONTINUE;
833 }
834
835 log.debug("Received BARRIER_REPLY : {}", msg);
836 future.deliverFuture(sw, msg);
837
838 return Command.CONTINUE;
839 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800840
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700841}