blob: 452938214a1540bf0b79ea50d2a422d509eeb7f7 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080013import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080014import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080015import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070016
Naoki Shiota7d0cf272013-11-05 10:18:12 -080017import org.openflow.protocol.*;
18import org.openflow.protocol.action.*;
19import org.openflow.protocol.factory.BasicFactory;
20import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070022
23import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.IFloodlightProviderService;
25import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070026import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080027import net.floodlightcontroller.core.internal.OFMessageFuture;
28import net.floodlightcontroller.core.module.FloodlightModuleContext;
29import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080030import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080031import net.floodlightcontroller.util.OFMessageDamper;
Pavlin Radoslavovda0ab442013-12-04 14:08:58 -080032import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080033import net.onrc.onos.ofcontroller.util.FlowEntryAction;
34import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080035import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080036import net.onrc.onos.ofcontroller.util.FlowEntryActions;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080037import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
38import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080039import net.onrc.onos.ofcontroller.util.IPv4Net;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080040import net.onrc.onos.ofcontroller.util.Pair;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080041import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070042
43/**
Naoki Shiotab485d412013-11-26 12:04:19 -080044 * FlowPusher is a implementation of FlowPusherService.
45 * FlowPusher assigns one message queue instance for each one switch.
46 * Number of message processing threads is configurable by constructor, and
47 * one thread can handle multiple message queues. Each queue will be assigned to
48 * a thread according to hash function defined by getHash().
49 * Each processing thread reads messages from queues and sends it to switches
50 * in round-robin. Processing thread also calculates rate of sending to suppress
51 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070052 * @author Naoki Shiota
53 *
54 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080055public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080056 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
Pavlin Radoslavovda0ab442013-12-04 14:08:58 -080057 protected volatile IFlowService flowManager;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080058
59 // NOTE: Below are moved from FlowManager.
60 // TODO: Values copied from elsewhere (class LearningSwitch).
61 // The local copy should go away!
62 //
63 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
64 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080065
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080066 // Number of messages sent to switch at once
67 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080068
69 public static final short PRIORITY_DEFAULT = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080070
71 public enum QueueState {
72 READY,
73 SUSPENDED,
74 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070075
Naoki Shiotac1601d32013-11-20 10:47:34 -080076 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080077 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080078 * This consists of queue itself and variables used for limiting sending rate.
79 * @author Naoki Shiota
80 *
81 */
Naoki Shiota7d0cf272013-11-05 10:18:12 -080082 private class SwitchQueue extends ArrayDeque<OFMessage> {
Naoki Shiota991093a2013-12-10 14:47:18 -080083 private static final long serialVersionUID = 1L;
84
Naoki Shiota7d0cf272013-11-05 10:18:12 -080085 QueueState state;
86
Naoki Shiotae3199732013-11-25 16:14:43 -080087 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080088 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070089 long last_sent_time = 0;
90 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080091
Naoki Shiotae3199732013-11-25 16:14:43 -080092 // "To be deleted" flag
93 boolean toBeDeleted = false;
94
Naoki Shiota7d0cf272013-11-05 10:18:12 -080095 /**
96 * Check if sending rate is within the rate
97 * @param current Current time
98 * @return true if within the rate
99 */
100 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800101 if (max_rate == 0) {
102 // no limitation
103 return true;
104 }
105
Naoki Shiota81dbe302013-11-21 15:35:38 -0800106 if (current == last_sent_time) {
107 return false;
108 }
109
Naoki Shiotac1601d32013-11-20 10:47:34 -0800110 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800111 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800112 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800113 }
114
Naoki Shiota81dbe302013-11-21 15:35:38 -0800115 /**
116 * Log time and size of last sent data.
117 * @param current Time to be sent.
118 * @param size Size of sent data (in bytes).
119 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800120 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800121 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800122 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800123 }
124
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700125 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800126
Naoki Shiotac1601d32013-11-20 10:47:34 -0800127 private OFMessageDamper messageDamper = null;
128 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700129
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800130 private FloodlightContext context = null;
131 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800132
133 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800134 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800135 // Map of Future objects versus dpid and transaction ID.
Naoki Shiotac1601d32013-11-20 10:47:34 -0800136 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
137 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800138
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800139 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800140
Naoki Shiota8739faa2013-11-18 17:00:25 -0800141 /**
142 * Main thread that reads messages from queues and sends them to switches.
143 * @author Naoki Shiota
144 *
145 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800146 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800147 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotab485d412013-11-26 12:04:19 -0800148 = new HashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800149
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800150 // reusable latch used for waiting for arrival of message
Naoki Shiota81dbe302013-11-21 15:35:38 -0800151 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800152
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700153 @Override
154 public void run() {
Yuta HIGUCHI61509a42013-12-17 10:41:04 -0800155 this.setName("FlowPusherThread " + this.getId() );
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700156 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800157 try {
158 // wait for message pushed to queue
159 mutex.acquire();
160 } catch (InterruptedException e) {
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800161 // not an error
Naoki Shiota81dbe302013-11-21 15:35:38 -0800162 log.debug("FlowPusherThread is interrupted");
163 return;
164 }
165
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800166 // for safety of concurrent access, copy all key objects
167 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800168 synchronized (queues) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800169 for (IOFSwitch sw : queues.keySet()) {
170 keys.add(sw);
171 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800172 }
173
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800174 for (IOFSwitch sw : keys) {
175 SwitchQueue queue = queues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800176
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700177 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800178 if (sw == null || queue == null ||
179 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700180 continue;
181 }
182
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800183 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800184 processQueue(sw, queue, MAX_MESSAGE_SEND);
185 if (queue.isEmpty()) {
186 // remove queue if flagged to be.
187 if (queue.toBeDeleted) {
188 synchronized (queues) {
189 queues.remove(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800190 }
191 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800192 } else {
193 // if some messages remains in queue, latch down
194 if (mutex.availablePermits() == 0) {
195 mutex.release();
Naoki Shiota81dbe302013-11-21 15:35:38 -0800196 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700197 }
198 }
199 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700200 }
201 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800202
203 /**
204 * Read messages from queue and send them to the switch.
205 * If number of messages excess the limit, stop sending messages.
206 * @param sw Switch to which messages will be sent.
207 * @param queue Queue of messages.
208 * @param max_msg Limitation of number of messages to be sent. If set to 0,
209 * all messages in queue will be sent.
210 */
211 private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
212 // check sending rate and determine it to be sent or not
213 long current_time = System.currentTimeMillis();
214 long size = 0;
215
216 if (queue.isSendable(current_time)) {
217 int i = 0;
218 while (! queue.isEmpty()) {
219 // Number of messages excess the limit
220 if (0 < max_msg && max_msg <= i) {
221 break;
222 }
223 ++i;
224
225 OFMessage msg = queue.poll();
226 try {
227 messageDamper.write(sw, msg, context);
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800228// log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800229 size += msg.getLength();
230 } catch (IOException e) {
231 e.printStackTrace();
232 log.error("Exception in sending message ({}) : {}", msg, e);
233 }
234 }
235 sw.flush();
236 queue.logSentData(current_time, size);
237 }
238 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700239 }
240
Naoki Shiotac1601d32013-11-20 10:47:34 -0800241 /**
242 * Initialize object with one thread.
243 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800244 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800245 }
246
Naoki Shiotac1601d32013-11-20 10:47:34 -0800247 /**
248 * Initialize object with threads of given number.
249 * @param number_thread Number of threads to handle messages.
250 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800251 public FlowPusher(int number_thread) {
252 this.number_thread = number_thread;
253 }
254
Naoki Shiotac1601d32013-11-20 10:47:34 -0800255 /**
256 * Set parameters needed for sending messages.
257 * @param context FloodlightContext used for sending messages.
258 * If null, FlowPusher uses default context.
259 * @param modContext FloodlightModuleContext used for acquiring
260 * ThreadPoolService and registering MessageListener.
261 * @param factory Factory object to create OFMessage objects.
262 * @param damper Message damper used for sending messages.
263 * If null, FlowPusher creates its own damper object.
264 */
265 public void init(FloodlightContext context,
266 FloodlightModuleContext modContext,
267 BasicFactory factory,
268 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700269 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800270 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800271 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
272 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
273 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Pavlin Radoslavovda0ab442013-12-04 14:08:58 -0800274 flowManager = modContext.getServiceImpl(IFlowService.class);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800275
276 if (damper != null) {
277 messageDamper = damper;
278 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800279 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800280 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
281 EnumSet.of(OFType.FLOW_MOD),
282 OFMESSAGE_DAMPER_TIMEOUT);
283 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700284 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800285
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800286 /**
287 * Begin processing queue.
288 */
289 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800290 if (factory == null) {
291 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800292 return;
293 }
294
Naoki Shiota81dbe302013-11-21 15:35:38 -0800295 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800296 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800297 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800298
Naoki Shiota81dbe302013-11-21 15:35:38 -0800299 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800300 thread.start();
301 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700302 }
303
Brian O'Connor8c166a72013-11-14 18:41:48 -0800304 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800305 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800306 SwitchQueue queue = getQueue(sw);
307
308 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800309 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800310 }
311
312 synchronized (queue) {
313 if (queue.state == QueueState.READY) {
314 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800315 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800316 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800317 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800318 }
319 }
320
Brian O'Connor8c166a72013-11-14 18:41:48 -0800321 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800322 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800323 SwitchQueue queue = getQueue(sw);
324
325 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800326 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800327 }
328
329 synchronized (queue) {
330 if (queue.state == QueueState.SUSPENDED) {
331 queue.state = QueueState.READY;
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800332
333 // Latch down if queue is not empty
334 FlowPusherThread thread = getProcess(sw);
335 if (! queue.isEmpty() &&
336 thread.mutex.availablePermits() == 0) {
337 thread.mutex.release();
338 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800339 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800340 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800341 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800342 }
343 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800344
Brian O'Connor8c166a72013-11-14 18:41:48 -0800345 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800346 public boolean isSuspended(IOFSwitch sw) {
347 SwitchQueue queue = getQueue(sw);
348
349 if (queue == null) {
350 // TODO Is true suitable for this case?
351 return true;
352 }
353
354 return (queue.state == QueueState.SUSPENDED);
355 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800356
357 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800358 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800359 */
360 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800361 if (threadMap == null) {
362 return;
363 }
364
Naoki Shiota81dbe302013-11-21 15:35:38 -0800365 for (FlowPusherThread t : threadMap.values()) {
366 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800367 }
368 }
369
Naoki Shiotae3199732013-11-25 16:14:43 -0800370 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800371 public void setRate(IOFSwitch sw, long rate) {
372 SwitchQueue queue = getQueue(sw);
373 if (queue == null) {
374 return;
375 }
376
377 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800378 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800379 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700380 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700381 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800382
383 @Override
384 public boolean createQueue(IOFSwitch sw) {
385 SwitchQueue queue = getQueue(sw);
386 if (queue != null) {
387 return false;
388 }
389
390 FlowPusherThread proc = getProcess(sw);
391 queue = new SwitchQueue();
392 queue.state = QueueState.READY;
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800393 synchronized (proc.queues) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800394 proc.queues.put(sw, queue);
395 }
396
397 return true;
398 }
399
400 @Override
401 public boolean deleteQueue(IOFSwitch sw) {
402 return deleteQueue(sw, false);
403 }
404
405 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800406 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800407 FlowPusherThread proc = getProcess(sw);
408
Naoki Shiotab485d412013-11-26 12:04:19 -0800409 if (forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800410 synchronized (proc.queues) {
411 SwitchQueue queue = proc.queues.remove(sw);
412 if (queue == null) {
413 return false;
414 }
415 }
416 return true;
417 } else {
418 SwitchQueue queue = getQueue(sw);
419 if (queue == null) {
420 return false;
421 }
422 synchronized (queue) {
423 queue.toBeDeleted = true;
424 }
425 return true;
426 }
427 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700428
Brian O'Connor8c166a72013-11-14 18:41:48 -0800429 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800430 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800431 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800432 SwitchQueue queue = proc.queues.get(sw);
433
Naoki Shiotab485d412013-11-26 12:04:19 -0800434 // create queue at first addition of message
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800435 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800436 createQueue(sw);
437 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800438 }
439
440 synchronized (queue) {
441 queue.add(msg);
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800442// log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800443 }
444
Naoki Shiota81dbe302013-11-21 15:35:38 -0800445 if (proc.mutex.availablePermits() == 0) {
446 proc.mutex.release();
447 }
448
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800449 return true;
450 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800451
Brian O'Connor8c166a72013-11-14 18:41:48 -0800452 @Override
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800453 public void pushFlowEntries(
454 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
455
456 List<Pair<IOFSwitch, FlowEntry>> pushedEntries =
457 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
458
459 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Pavlin Radoslavov4535bc12013-12-05 10:43:49 -0800460 if (add(entry.first, entry.second)) {
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800461 pushedEntries.add(entry);
462 }
463 }
464
465 //
466 // TODO: We should use the OpenFlow Barrier mechanism
467 // to check for errors, and update the SwitchState
468 // for a flow entry after the Barrier message is
469 // is received.
470 // Only after inform the Flow Manager that the entry is pushed.
471 //
472 flowManager.flowEntriesPushedToSwitch(pushedEntries);
473 }
474
475 @Override
476 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
477 Collection<Pair<IOFSwitch, FlowEntry>> entries =
478 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
479
480 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
481 pushFlowEntries(entries);
482 }
483
484 /**
485 * Create a message from FlowEntry and add it to the queue of the switch.
486 * @param sw Switch to which message is pushed.
487 * @param flowEntry FlowEntry object used for creating message.
488 * @return true if message is successfully added to a queue.
489 */
490 private boolean add(IOFSwitch sw, FlowEntry flowEntry) {
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800491 //
492 // Create the OpenFlow Flow Modification Entry to push
493 //
494 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
495 long cookie = flowEntry.flowEntryId().value();
496
497 short flowModCommand = OFFlowMod.OFPFC_ADD;
498 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
499 flowModCommand = OFFlowMod.OFPFC_ADD;
500 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
501 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
502 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
503 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
504 } else {
505 // Unknown user state. Ignore the entry
506 log.debug(
507 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
508 flowEntry.flowEntryId().toString(),
509 flowEntry.flowEntryUserState());
510 return false;
511 }
512
513 //
514 // Fetch the match conditions.
515 //
516 // NOTE: The Flow matching conditions common for all Flow Entries are
517 // used ONLY if a Flow Entry does NOT have the corresponding matching
518 // condition set.
519 //
520 OFMatch match = new OFMatch();
521 match.setWildcards(OFMatch.OFPFW_ALL);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800522 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
523
524 // Match the Incoming Port
525 Port matchInPort = flowEntryMatch.inPort();
526 if (matchInPort != null) {
527 match.setInputPort(matchInPort.value());
528 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
529 }
530
531 // Match the Source MAC address
532 MACAddress matchSrcMac = flowEntryMatch.srcMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800533 if (matchSrcMac != null) {
534 match.setDataLayerSource(matchSrcMac.toString());
535 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
536 }
537
538 // Match the Destination MAC address
539 MACAddress matchDstMac = flowEntryMatch.dstMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800540 if (matchDstMac != null) {
541 match.setDataLayerDestination(matchDstMac.toString());
542 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
543 }
544
545 // Match the Ethernet Frame Type
546 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800547 if (matchEthernetFrameType != null) {
548 match.setDataLayerType(matchEthernetFrameType);
549 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
550 }
551
552 // Match the VLAN ID
553 Short matchVlanId = flowEntryMatch.vlanId();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800554 if (matchVlanId != null) {
555 match.setDataLayerVirtualLan(matchVlanId);
556 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
557 }
558
559 // Match the VLAN priority
560 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800561 if (matchVlanPriority != null) {
562 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
563 match.setWildcards(match.getWildcards()
564 & ~OFMatch.OFPFW_DL_VLAN_PCP);
565 }
566
567 // Match the Source IPv4 Network prefix
568 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800569 if (matchSrcIPv4Net != null) {
570 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
571 }
572
573 // Natch the Destination IPv4 Network prefix
574 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800575 if (matchDstIPv4Net != null) {
576 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
577 }
578
579 // Match the IP protocol
580 Byte matchIpProto = flowEntryMatch.ipProto();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800581 if (matchIpProto != null) {
582 match.setNetworkProtocol(matchIpProto);
583 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
584 }
585
586 // Match the IP ToS (DSCP field, 6 bits)
587 Byte matchIpToS = flowEntryMatch.ipToS();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800588 if (matchIpToS != null) {
589 match.setNetworkTypeOfService(matchIpToS);
590 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
591 }
592
593 // Match the Source TCP/UDP port
594 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800595 if (matchSrcTcpUdpPort != null) {
596 match.setTransportSource(matchSrcTcpUdpPort);
597 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
598 }
599
600 // Match the Destination TCP/UDP port
601 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800602 if (matchDstTcpUdpPort != null) {
603 match.setTransportDestination(matchDstTcpUdpPort);
604 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
605 }
606
607 //
608 // Fetch the actions
609 //
610 Short actionOutputPort = null;
611 List<OFAction> openFlowActions = new ArrayList<OFAction>();
612 int actionsLen = 0;
613 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
614 //
615 for (FlowEntryAction action : flowEntryActions.actions()) {
616 ActionOutput actionOutput = action.actionOutput();
617 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
618 ActionSetVlanPriority actionSetVlanPriority = action
619 .actionSetVlanPriority();
620 ActionStripVlan actionStripVlan = action.actionStripVlan();
621 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
622 .actionSetEthernetSrcAddr();
623 ActionSetEthernetAddr actionSetEthernetDstAddr = action
624 .actionSetEthernetDstAddr();
625 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
626 .actionSetIPv4SrcAddr();
627 ActionSetIPv4Addr actionSetIPv4DstAddr = action
628 .actionSetIPv4DstAddr();
629 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
630 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
631 .actionSetTcpUdpSrcPort();
632 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
633 .actionSetTcpUdpDstPort();
634 ActionEnqueue actionEnqueue = action.actionEnqueue();
635
636 if (actionOutput != null) {
637 actionOutputPort = actionOutput.port().value();
638 // XXX: The max length is hard-coded for now
639 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
640 .value(), (short) 0xffff);
641 openFlowActions.add(ofa);
642 actionsLen += ofa.getLength();
643 }
644
645 if (actionSetVlanId != null) {
646 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
647 actionSetVlanId.vlanId());
648 openFlowActions.add(ofa);
649 actionsLen += ofa.getLength();
650 }
651
652 if (actionSetVlanPriority != null) {
653 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
654 actionSetVlanPriority.vlanPriority());
655 openFlowActions.add(ofa);
656 actionsLen += ofa.getLength();
657 }
658
659 if (actionStripVlan != null) {
660 if (actionStripVlan.stripVlan() == true) {
661 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
662 openFlowActions.add(ofa);
663 actionsLen += ofa.getLength();
664 }
665 }
666
667 if (actionSetEthernetSrcAddr != null) {
668 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
669 actionSetEthernetSrcAddr.addr().toBytes());
670 openFlowActions.add(ofa);
671 actionsLen += ofa.getLength();
672 }
673
674 if (actionSetEthernetDstAddr != null) {
675 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
676 actionSetEthernetDstAddr.addr().toBytes());
677 openFlowActions.add(ofa);
678 actionsLen += ofa.getLength();
679 }
680
681 if (actionSetIPv4SrcAddr != null) {
682 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
683 actionSetIPv4SrcAddr.addr().value());
684 openFlowActions.add(ofa);
685 actionsLen += ofa.getLength();
686 }
687
688 if (actionSetIPv4DstAddr != null) {
689 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
690 actionSetIPv4DstAddr.addr().value());
691 openFlowActions.add(ofa);
692 actionsLen += ofa.getLength();
693 }
694
695 if (actionSetIpToS != null) {
696 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
697 actionSetIpToS.ipToS());
698 openFlowActions.add(ofa);
699 actionsLen += ofa.getLength();
700 }
701
702 if (actionSetTcpUdpSrcPort != null) {
703 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
704 actionSetTcpUdpSrcPort.port());
705 openFlowActions.add(ofa);
706 actionsLen += ofa.getLength();
707 }
708
709 if (actionSetTcpUdpDstPort != null) {
710 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
711 actionSetTcpUdpDstPort.port());
712 openFlowActions.add(ofa);
713 actionsLen += ofa.getLength();
714 }
715
716 if (actionEnqueue != null) {
717 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
718 .value(), actionEnqueue.queueId());
719 openFlowActions.add(ofa);
720 actionsLen += ofa.getLength();
721 }
722 }
723
Pavlin Radoslavov1fe06a22013-12-10 14:12:23 -0800724 fm.setIdleTimeout((short)flowEntry.idleTimeout())
725 .setHardTimeout((short)flowEntry.hardTimeout())
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800726 .setPriority(PRIORITY_DEFAULT)
727 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
728 .setCommand(flowModCommand).setMatch(match)
729 .setActions(openFlowActions)
730 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
731 fm.setOutPort(OFPort.OFPP_NONE.getValue());
732 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
733 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
734 if (actionOutputPort != null)
735 fm.setOutPort(actionOutputPort);
736 }
737
738 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800739 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
740 // permanent.
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800741 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800742 if ((flowEntry.idleTimeout() != 0) ||
743 (flowEntry.hardTimeout() != 0)) {
744 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
745 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800746
747 //
748 // Write the message to the switch
749 //
Pavlin Radoslavovf0678902013-12-03 15:06:56 -0800750 log.debug("Installing flow entry "
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800751 + flowEntry.flowEntryUserState() + " into switch DPID: "
752 + sw.getStringId() + " flowEntryId: "
753 + flowEntry.flowEntryId().toString() + " srcMac: "
754 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
755 + matchInPort + " outPort: " + actionOutputPort);
756
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800757 return add(sw, fm);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800758 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800759
760 @Override
761 public OFBarrierReply barrier(IOFSwitch sw) {
762 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
763 if (future == null) {
764 return null;
765 }
766
767 try {
768 return future.get();
769 } catch (InterruptedException e) {
770 e.printStackTrace();
771 log.error("InterruptedException: {}", e);
772 return null;
773 } catch (ExecutionException e) {
774 e.printStackTrace();
775 log.error("ExecutionException: {}", e);
776 return null;
777 }
778 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800779
Naoki Shiotac1601d32013-11-20 10:47:34 -0800780 @Override
781 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
782 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -0800783
784 if (sw == null) {
785 return null;
786 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800787
Naoki Shiota81dbe302013-11-21 15:35:38 -0800788 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800789 msg.setXid(sw.getNextTransactionId());
Naoki Shiotac1601d32013-11-20 10:47:34 -0800790
Naoki Shiotac1601d32013-11-20 10:47:34 -0800791 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiotac1601d32013-11-20 10:47:34 -0800792 synchronized (barrierFutures) {
793 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
794 if (map == null) {
795 map = new HashMap<Integer,OFBarrierReplyFuture>();
796 barrierFutures.put(sw.getId(), map);
797 }
798 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800799 }
800
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800801 add(sw, msg);
802
Naoki Shiotac1601d32013-11-20 10:47:34 -0800803 return future;
804 }
805
Naoki Shiotae3199732013-11-25 16:14:43 -0800806 /**
807 * Get a queue attached to a switch.
808 * @param sw Switch object
809 * @return Queue object
810 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800811 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800812 if (sw == null) {
813 return null;
814 }
815
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800816 return getProcess(sw).queues.get(sw);
817 }
818
Naoki Shiotae3199732013-11-25 16:14:43 -0800819 /**
820 * Get a hash value correspondent to a switch.
821 * @param sw Switch object
822 * @return Hash value
823 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800824 protected long getHash(IOFSwitch sw) {
825 // This code assumes DPID is sequentially assigned.
826 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800827 return sw.getId() % number_thread;
828 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800829
830 /**
831 * Get a Thread object which processes the queue attached to a switch.
832 * @param sw Switch object
833 * @return Thread object
834 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800835 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800836 long hash = getHash(sw);
837
838 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800839 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800840
841 @Override
842 public String getName() {
843 return "flowpusher";
844 }
845
846 @Override
847 public boolean isCallbackOrderingPrereq(OFType type, String name) {
848 return false;
849 }
850
851 @Override
852 public boolean isCallbackOrderingPostreq(OFType type, String name) {
853 return false;
854 }
855
856 @Override
857 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -0800858 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
859 if (map == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800860 log.debug("null map for {} : {}", sw.getId(), barrierFutures);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800861 return Command.CONTINUE;
862 }
863
864 OFBarrierReplyFuture future = map.get(msg.getXid());
865 if (future == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800866 log.debug("null future for {} : {}", msg.getXid(), map);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800867 return Command.CONTINUE;
868 }
869
870 log.debug("Received BARRIER_REPLY : {}", msg);
871 future.deliverFuture(sw, msg);
872
873 return Command.CONTINUE;
874 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800875
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700876}