blob: 21be62ccb0545208d661f26c0cf9fe9eb77a5669 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080013import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080014import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080015import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070016
Naoki Shiota7d0cf272013-11-05 10:18:12 -080017import org.openflow.protocol.*;
18import org.openflow.protocol.action.*;
19import org.openflow.protocol.factory.BasicFactory;
20import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070022
23import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.IFloodlightProviderService;
25import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070026import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080027import net.floodlightcontroller.core.internal.OFMessageFuture;
28import net.floodlightcontroller.core.module.FloodlightModuleContext;
29import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080030import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080031import net.floodlightcontroller.util.OFMessageDamper;
Pavlin Radoslavovda0ab442013-12-04 14:08:58 -080032import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080033import net.onrc.onos.ofcontroller.util.FlowEntryAction;
34import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080035import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080036import net.onrc.onos.ofcontroller.util.FlowEntryActions;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080037import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
38import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080039import net.onrc.onos.ofcontroller.util.IPv4Net;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080040import net.onrc.onos.ofcontroller.util.Pair;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080041import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070042
43/**
Naoki Shiotab485d412013-11-26 12:04:19 -080044 * FlowPusher is a implementation of FlowPusherService.
45 * FlowPusher assigns one message queue instance for each one switch.
46 * Number of message processing threads is configurable by constructor, and
47 * one thread can handle multiple message queues. Each queue will be assigned to
48 * a thread according to hash function defined by getHash().
49 * Each processing thread reads messages from queues and sends it to switches
50 * in round-robin. Processing thread also calculates rate of sending to suppress
51 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070052 * @author Naoki Shiota
53 *
54 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080055public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080056 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
Pavlin Radoslavovda0ab442013-12-04 14:08:58 -080057 protected volatile IFlowService flowManager;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080058
59 // NOTE: Below are moved from FlowManager.
60 // TODO: Values copied from elsewhere (class LearningSwitch).
61 // The local copy should go away!
62 //
63 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
64 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080065
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080066 // Number of messages sent to switch at once
67 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080068
Pavlin Radoslavovafbf1032014-02-04 10:37:52 -080069 public enum QueueState {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080070 READY,
71 SUSPENDED,
72 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070073
Naoki Shiotac1601d32013-11-20 10:47:34 -080074 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080075 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080076 * This consists of queue itself and variables used for limiting sending rate.
77 * @author Naoki Shiota
78 *
79 */
Naoki Shiota7d0cf272013-11-05 10:18:12 -080080 private class SwitchQueue extends ArrayDeque<OFMessage> {
Naoki Shiota991093a2013-12-10 14:47:18 -080081 private static final long serialVersionUID = 1L;
82
Naoki Shiota7d0cf272013-11-05 10:18:12 -080083 QueueState state;
84
Naoki Shiotae3199732013-11-25 16:14:43 -080085 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080086 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070087 long last_sent_time = 0;
88 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080089
Naoki Shiotae3199732013-11-25 16:14:43 -080090 // "To be deleted" flag
91 boolean toBeDeleted = false;
92
Naoki Shiota7d0cf272013-11-05 10:18:12 -080093 /**
94 * Check if sending rate is within the rate
95 * @param current Current time
96 * @return true if within the rate
97 */
98 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -080099 if (max_rate == 0) {
100 // no limitation
101 return true;
102 }
103
Naoki Shiota81dbe302013-11-21 15:35:38 -0800104 if (current == last_sent_time) {
105 return false;
106 }
107
Naoki Shiotac1601d32013-11-20 10:47:34 -0800108 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800109 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800110 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800111 }
112
Naoki Shiota81dbe302013-11-21 15:35:38 -0800113 /**
114 * Log time and size of last sent data.
115 * @param current Time to be sent.
116 * @param size Size of sent data (in bytes).
117 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800118 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800119 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800120 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800121 }
122
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700123 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800124
Naoki Shiotac1601d32013-11-20 10:47:34 -0800125 private OFMessageDamper messageDamper = null;
126 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700127
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800128 private FloodlightContext context = null;
129 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800130
131 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800132 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800133 // Map of Future objects versus dpid and transaction ID.
Naoki Shiotac1601d32013-11-20 10:47:34 -0800134 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
135 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800136
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800137 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800138
Naoki Shiota8739faa2013-11-18 17:00:25 -0800139 /**
140 * Main thread that reads messages from queues and sends them to switches.
141 * @author Naoki Shiota
142 *
143 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800144 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800145 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotab485d412013-11-26 12:04:19 -0800146 = new HashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800147
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800148 // reusable latch used for waiting for arrival of message
Naoki Shiota81dbe302013-11-21 15:35:38 -0800149 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800150
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700151 @Override
152 public void run() {
Yuta HIGUCHI61509a42013-12-17 10:41:04 -0800153 this.setName("FlowPusherThread " + this.getId() );
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700154 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800155 try {
156 // wait for message pushed to queue
157 mutex.acquire();
158 } catch (InterruptedException e) {
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800159 // not an error
Naoki Shiota81dbe302013-11-21 15:35:38 -0800160 log.debug("FlowPusherThread is interrupted");
161 return;
162 }
163
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800164 // for safety of concurrent access, copy all key objects
165 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800166 synchronized (queues) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800167 for (IOFSwitch sw : queues.keySet()) {
168 keys.add(sw);
169 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800170 }
171
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800172 for (IOFSwitch sw : keys) {
173 SwitchQueue queue = queues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800174
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700175 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800176 if (sw == null || queue == null ||
177 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700178 continue;
179 }
180
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800181 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800182 processQueue(sw, queue, MAX_MESSAGE_SEND);
183 if (queue.isEmpty()) {
184 // remove queue if flagged to be.
185 if (queue.toBeDeleted) {
186 synchronized (queues) {
187 queues.remove(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800188 }
189 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800190 } else {
191 // if some messages remains in queue, latch down
192 if (mutex.availablePermits() == 0) {
193 mutex.release();
Naoki Shiota81dbe302013-11-21 15:35:38 -0800194 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700195 }
196 }
197 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700198 }
199 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800200
201 /**
202 * Read messages from queue and send them to the switch.
203 * If number of messages excess the limit, stop sending messages.
204 * @param sw Switch to which messages will be sent.
205 * @param queue Queue of messages.
206 * @param max_msg Limitation of number of messages to be sent. If set to 0,
207 * all messages in queue will be sent.
208 */
209 private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
210 // check sending rate and determine it to be sent or not
211 long current_time = System.currentTimeMillis();
212 long size = 0;
213
214 if (queue.isSendable(current_time)) {
215 int i = 0;
216 while (! queue.isEmpty()) {
217 // Number of messages excess the limit
218 if (0 < max_msg && max_msg <= i) {
219 break;
220 }
221 ++i;
222
223 OFMessage msg = queue.poll();
224 try {
225 messageDamper.write(sw, msg, context);
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800226// log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800227 size += msg.getLength();
228 } catch (IOException e) {
229 e.printStackTrace();
230 log.error("Exception in sending message ({}) : {}", msg, e);
231 }
232 }
233 sw.flush();
234 queue.logSentData(current_time, size);
235 }
236 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700237 }
238
Naoki Shiotac1601d32013-11-20 10:47:34 -0800239 /**
240 * Initialize object with one thread.
241 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800242 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800243 }
244
Naoki Shiotac1601d32013-11-20 10:47:34 -0800245 /**
246 * Initialize object with threads of given number.
247 * @param number_thread Number of threads to handle messages.
248 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800249 public FlowPusher(int number_thread) {
250 this.number_thread = number_thread;
251 }
252
Naoki Shiotac1601d32013-11-20 10:47:34 -0800253 /**
254 * Set parameters needed for sending messages.
255 * @param context FloodlightContext used for sending messages.
256 * If null, FlowPusher uses default context.
257 * @param modContext FloodlightModuleContext used for acquiring
258 * ThreadPoolService and registering MessageListener.
259 * @param factory Factory object to create OFMessage objects.
260 * @param damper Message damper used for sending messages.
261 * If null, FlowPusher creates its own damper object.
262 */
263 public void init(FloodlightContext context,
264 FloodlightModuleContext modContext,
265 BasicFactory factory,
266 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700267 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800268 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800269 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
270 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
271 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Pavlin Radoslavovda0ab442013-12-04 14:08:58 -0800272 flowManager = modContext.getServiceImpl(IFlowService.class);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800273
274 if (damper != null) {
275 messageDamper = damper;
276 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800277 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800278 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
279 EnumSet.of(OFType.FLOW_MOD),
280 OFMESSAGE_DAMPER_TIMEOUT);
281 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700282 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800283
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800284 /**
285 * Begin processing queue.
286 */
287 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800288 if (factory == null) {
289 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800290 return;
291 }
292
Naoki Shiota81dbe302013-11-21 15:35:38 -0800293 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800294 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800295 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800296
Naoki Shiota81dbe302013-11-21 15:35:38 -0800297 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800298 thread.start();
299 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700300 }
301
Brian O'Connor8c166a72013-11-14 18:41:48 -0800302 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800303 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800304 SwitchQueue queue = getQueue(sw);
305
306 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800307 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800308 }
309
310 synchronized (queue) {
311 if (queue.state == QueueState.READY) {
312 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800313 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800314 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800315 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800316 }
317 }
318
Brian O'Connor8c166a72013-11-14 18:41:48 -0800319 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800320 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800321 SwitchQueue queue = getQueue(sw);
322
323 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800324 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800325 }
326
327 synchronized (queue) {
328 if (queue.state == QueueState.SUSPENDED) {
329 queue.state = QueueState.READY;
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800330
331 // Latch down if queue is not empty
332 FlowPusherThread thread = getProcess(sw);
333 if (! queue.isEmpty() &&
334 thread.mutex.availablePermits() == 0) {
335 thread.mutex.release();
336 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800337 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800338 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800339 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800340 }
341 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800342
Brian O'Connor8c166a72013-11-14 18:41:48 -0800343 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800344 public boolean isSuspended(IOFSwitch sw) {
345 SwitchQueue queue = getQueue(sw);
346
347 if (queue == null) {
348 // TODO Is true suitable for this case?
349 return true;
350 }
351
352 return (queue.state == QueueState.SUSPENDED);
353 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800354
355 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800356 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800357 */
358 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800359 if (threadMap == null) {
360 return;
361 }
362
Naoki Shiota81dbe302013-11-21 15:35:38 -0800363 for (FlowPusherThread t : threadMap.values()) {
364 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800365 }
366 }
367
Naoki Shiotae3199732013-11-25 16:14:43 -0800368 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800369 public void setRate(IOFSwitch sw, long rate) {
370 SwitchQueue queue = getQueue(sw);
371 if (queue == null) {
372 return;
373 }
374
375 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800376 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800377 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700378 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700379 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800380
381 @Override
382 public boolean createQueue(IOFSwitch sw) {
383 SwitchQueue queue = getQueue(sw);
384 if (queue != null) {
385 return false;
386 }
387
388 FlowPusherThread proc = getProcess(sw);
389 queue = new SwitchQueue();
390 queue.state = QueueState.READY;
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800391 synchronized (proc.queues) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800392 proc.queues.put(sw, queue);
393 }
394
395 return true;
396 }
397
398 @Override
399 public boolean deleteQueue(IOFSwitch sw) {
400 return deleteQueue(sw, false);
401 }
402
403 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800404 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800405 FlowPusherThread proc = getProcess(sw);
406
Naoki Shiotab485d412013-11-26 12:04:19 -0800407 if (forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800408 synchronized (proc.queues) {
409 SwitchQueue queue = proc.queues.remove(sw);
410 if (queue == null) {
411 return false;
412 }
413 }
414 return true;
415 } else {
416 SwitchQueue queue = getQueue(sw);
417 if (queue == null) {
418 return false;
419 }
420 synchronized (queue) {
421 queue.toBeDeleted = true;
422 }
423 return true;
424 }
425 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700426
Brian O'Connor8c166a72013-11-14 18:41:48 -0800427 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800428 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800429 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800430 SwitchQueue queue = proc.queues.get(sw);
431
Naoki Shiotab485d412013-11-26 12:04:19 -0800432 // create queue at first addition of message
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800433 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800434 createQueue(sw);
435 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800436 }
437
438 synchronized (queue) {
439 queue.add(msg);
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800440// log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800441 }
442
Naoki Shiota81dbe302013-11-21 15:35:38 -0800443 if (proc.mutex.availablePermits() == 0) {
444 proc.mutex.release();
445 }
446
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800447 return true;
448 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800449
Brian O'Connor8c166a72013-11-14 18:41:48 -0800450 @Override
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800451 public void pushFlowEntries(
452 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
453
454 List<Pair<IOFSwitch, FlowEntry>> pushedEntries =
455 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
456
457 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Pavlin Radoslavov4535bc12013-12-05 10:43:49 -0800458 if (add(entry.first, entry.second)) {
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800459 pushedEntries.add(entry);
460 }
461 }
462
463 //
464 // TODO: We should use the OpenFlow Barrier mechanism
465 // to check for errors, and update the SwitchState
466 // for a flow entry after the Barrier message is
467 // is received.
468 // Only after inform the Flow Manager that the entry is pushed.
469 //
470 flowManager.flowEntriesPushedToSwitch(pushedEntries);
471 }
472
473 @Override
474 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
475 Collection<Pair<IOFSwitch, FlowEntry>> entries =
476 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
477
478 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
479 pushFlowEntries(entries);
480 }
481
482 /**
483 * Create a message from FlowEntry and add it to the queue of the switch.
484 * @param sw Switch to which message is pushed.
485 * @param flowEntry FlowEntry object used for creating message.
486 * @return true if message is successfully added to a queue.
487 */
488 private boolean add(IOFSwitch sw, FlowEntry flowEntry) {
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800489 //
490 // Create the OpenFlow Flow Modification Entry to push
491 //
492 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
493 long cookie = flowEntry.flowEntryId().value();
494
495 short flowModCommand = OFFlowMod.OFPFC_ADD;
496 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
497 flowModCommand = OFFlowMod.OFPFC_ADD;
498 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
499 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
500 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
501 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
502 } else {
503 // Unknown user state. Ignore the entry
504 log.debug(
505 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800506 flowEntry.flowEntryId(),
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800507 flowEntry.flowEntryUserState());
508 return false;
509 }
510
511 //
512 // Fetch the match conditions.
513 //
514 // NOTE: The Flow matching conditions common for all Flow Entries are
515 // used ONLY if a Flow Entry does NOT have the corresponding matching
516 // condition set.
517 //
518 OFMatch match = new OFMatch();
519 match.setWildcards(OFMatch.OFPFW_ALL);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800520 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
521
522 // Match the Incoming Port
523 Port matchInPort = flowEntryMatch.inPort();
524 if (matchInPort != null) {
525 match.setInputPort(matchInPort.value());
526 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
527 }
528
529 // Match the Source MAC address
530 MACAddress matchSrcMac = flowEntryMatch.srcMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800531 if (matchSrcMac != null) {
532 match.setDataLayerSource(matchSrcMac.toString());
533 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
534 }
535
536 // Match the Destination MAC address
537 MACAddress matchDstMac = flowEntryMatch.dstMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800538 if (matchDstMac != null) {
539 match.setDataLayerDestination(matchDstMac.toString());
540 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
541 }
542
543 // Match the Ethernet Frame Type
544 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800545 if (matchEthernetFrameType != null) {
546 match.setDataLayerType(matchEthernetFrameType);
547 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
548 }
549
550 // Match the VLAN ID
551 Short matchVlanId = flowEntryMatch.vlanId();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800552 if (matchVlanId != null) {
553 match.setDataLayerVirtualLan(matchVlanId);
554 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
555 }
556
557 // Match the VLAN priority
558 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800559 if (matchVlanPriority != null) {
560 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
561 match.setWildcards(match.getWildcards()
562 & ~OFMatch.OFPFW_DL_VLAN_PCP);
563 }
564
565 // Match the Source IPv4 Network prefix
566 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800567 if (matchSrcIPv4Net != null) {
568 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
569 }
570
571 // Natch the Destination IPv4 Network prefix
572 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800573 if (matchDstIPv4Net != null) {
574 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
575 }
576
577 // Match the IP protocol
578 Byte matchIpProto = flowEntryMatch.ipProto();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800579 if (matchIpProto != null) {
580 match.setNetworkProtocol(matchIpProto);
581 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
582 }
583
584 // Match the IP ToS (DSCP field, 6 bits)
585 Byte matchIpToS = flowEntryMatch.ipToS();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800586 if (matchIpToS != null) {
587 match.setNetworkTypeOfService(matchIpToS);
588 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
589 }
590
591 // Match the Source TCP/UDP port
592 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800593 if (matchSrcTcpUdpPort != null) {
594 match.setTransportSource(matchSrcTcpUdpPort);
595 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
596 }
597
598 // Match the Destination TCP/UDP port
599 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800600 if (matchDstTcpUdpPort != null) {
601 match.setTransportDestination(matchDstTcpUdpPort);
602 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
603 }
604
605 //
606 // Fetch the actions
607 //
608 Short actionOutputPort = null;
609 List<OFAction> openFlowActions = new ArrayList<OFAction>();
610 int actionsLen = 0;
611 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
612 //
613 for (FlowEntryAction action : flowEntryActions.actions()) {
614 ActionOutput actionOutput = action.actionOutput();
615 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
616 ActionSetVlanPriority actionSetVlanPriority = action
617 .actionSetVlanPriority();
618 ActionStripVlan actionStripVlan = action.actionStripVlan();
619 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
620 .actionSetEthernetSrcAddr();
621 ActionSetEthernetAddr actionSetEthernetDstAddr = action
622 .actionSetEthernetDstAddr();
623 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
624 .actionSetIPv4SrcAddr();
625 ActionSetIPv4Addr actionSetIPv4DstAddr = action
626 .actionSetIPv4DstAddr();
627 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
628 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
629 .actionSetTcpUdpSrcPort();
630 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
631 .actionSetTcpUdpDstPort();
632 ActionEnqueue actionEnqueue = action.actionEnqueue();
633
634 if (actionOutput != null) {
635 actionOutputPort = actionOutput.port().value();
636 // XXX: The max length is hard-coded for now
637 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
638 .value(), (short) 0xffff);
639 openFlowActions.add(ofa);
640 actionsLen += ofa.getLength();
641 }
642
643 if (actionSetVlanId != null) {
644 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
645 actionSetVlanId.vlanId());
646 openFlowActions.add(ofa);
647 actionsLen += ofa.getLength();
648 }
649
650 if (actionSetVlanPriority != null) {
651 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
652 actionSetVlanPriority.vlanPriority());
653 openFlowActions.add(ofa);
654 actionsLen += ofa.getLength();
655 }
656
657 if (actionStripVlan != null) {
658 if (actionStripVlan.stripVlan() == true) {
659 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
660 openFlowActions.add(ofa);
661 actionsLen += ofa.getLength();
662 }
663 }
664
665 if (actionSetEthernetSrcAddr != null) {
666 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
667 actionSetEthernetSrcAddr.addr().toBytes());
668 openFlowActions.add(ofa);
669 actionsLen += ofa.getLength();
670 }
671
672 if (actionSetEthernetDstAddr != null) {
673 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
674 actionSetEthernetDstAddr.addr().toBytes());
675 openFlowActions.add(ofa);
676 actionsLen += ofa.getLength();
677 }
678
679 if (actionSetIPv4SrcAddr != null) {
680 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
681 actionSetIPv4SrcAddr.addr().value());
682 openFlowActions.add(ofa);
683 actionsLen += ofa.getLength();
684 }
685
686 if (actionSetIPv4DstAddr != null) {
687 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
688 actionSetIPv4DstAddr.addr().value());
689 openFlowActions.add(ofa);
690 actionsLen += ofa.getLength();
691 }
692
693 if (actionSetIpToS != null) {
694 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
695 actionSetIpToS.ipToS());
696 openFlowActions.add(ofa);
697 actionsLen += ofa.getLength();
698 }
699
700 if (actionSetTcpUdpSrcPort != null) {
701 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
702 actionSetTcpUdpSrcPort.port());
703 openFlowActions.add(ofa);
704 actionsLen += ofa.getLength();
705 }
706
707 if (actionSetTcpUdpDstPort != null) {
708 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
709 actionSetTcpUdpDstPort.port());
710 openFlowActions.add(ofa);
711 actionsLen += ofa.getLength();
712 }
713
714 if (actionEnqueue != null) {
715 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
716 .value(), actionEnqueue.queueId());
717 openFlowActions.add(ofa);
718 actionsLen += ofa.getLength();
719 }
720 }
721
Pavlin Radoslavov1fe06a22013-12-10 14:12:23 -0800722 fm.setIdleTimeout((short)flowEntry.idleTimeout())
723 .setHardTimeout((short)flowEntry.hardTimeout())
Pavlin Radoslavovafbf1032014-02-04 10:37:52 -0800724 .setPriority((short)flowEntry.priority())
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800725 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
726 .setCommand(flowModCommand).setMatch(match)
727 .setActions(openFlowActions)
728 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
729 fm.setOutPort(OFPort.OFPP_NONE.getValue());
730 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
731 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
732 if (actionOutputPort != null)
733 fm.setOutPort(actionOutputPort);
734 }
735
736 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800737 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
738 // permanent.
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800739 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800740 if ((flowEntry.idleTimeout() != 0) ||
741 (flowEntry.hardTimeout() != 0)) {
742 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
743 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800744
745 //
746 // Write the message to the switch
747 //
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800748 log.debug("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
749 , flowEntry.flowEntryUserState()
750 , sw.getStringId()
751 , flowEntry.flowEntryId()
752 , matchSrcMac
753 , matchDstMac
754 , matchInPort
755 , actionOutputPort
756 );
757
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800758 return add(sw, fm);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800759 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800760
761 @Override
762 public OFBarrierReply barrier(IOFSwitch sw) {
763 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
764 if (future == null) {
765 return null;
766 }
767
768 try {
769 return future.get();
770 } catch (InterruptedException e) {
771 e.printStackTrace();
772 log.error("InterruptedException: {}", e);
773 return null;
774 } catch (ExecutionException e) {
775 e.printStackTrace();
776 log.error("ExecutionException: {}", e);
777 return null;
778 }
779 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800780
Naoki Shiotac1601d32013-11-20 10:47:34 -0800781 @Override
782 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
783 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -0800784
785 if (sw == null) {
786 return null;
787 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800788
Naoki Shiota81dbe302013-11-21 15:35:38 -0800789 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800790 msg.setXid(sw.getNextTransactionId());
Naoki Shiotac1601d32013-11-20 10:47:34 -0800791
Naoki Shiotac1601d32013-11-20 10:47:34 -0800792 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiotac1601d32013-11-20 10:47:34 -0800793 synchronized (barrierFutures) {
794 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
795 if (map == null) {
796 map = new HashMap<Integer,OFBarrierReplyFuture>();
797 barrierFutures.put(sw.getId(), map);
798 }
799 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800800 }
801
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800802 add(sw, msg);
803
Naoki Shiotac1601d32013-11-20 10:47:34 -0800804 return future;
805 }
806
Naoki Shiotae3199732013-11-25 16:14:43 -0800807 /**
808 * Get a queue attached to a switch.
809 * @param sw Switch object
810 * @return Queue object
811 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800812 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800813 if (sw == null) {
814 return null;
815 }
816
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800817 return getProcess(sw).queues.get(sw);
818 }
819
Naoki Shiotae3199732013-11-25 16:14:43 -0800820 /**
821 * Get a hash value correspondent to a switch.
822 * @param sw Switch object
823 * @return Hash value
824 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800825 protected long getHash(IOFSwitch sw) {
826 // This code assumes DPID is sequentially assigned.
827 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800828 return sw.getId() % number_thread;
829 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800830
831 /**
832 * Get a Thread object which processes the queue attached to a switch.
833 * @param sw Switch object
834 * @return Thread object
835 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800836 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800837 long hash = getHash(sw);
838
839 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800840 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800841
842 @Override
843 public String getName() {
844 return "flowpusher";
845 }
846
847 @Override
848 public boolean isCallbackOrderingPrereq(OFType type, String name) {
849 return false;
850 }
851
852 @Override
853 public boolean isCallbackOrderingPostreq(OFType type, String name) {
854 return false;
855 }
856
857 @Override
858 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -0800859 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
860 if (map == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800861 log.debug("null map for {} : {}", sw.getId(), barrierFutures);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800862 return Command.CONTINUE;
863 }
864
865 OFBarrierReplyFuture future = map.get(msg.getXid());
866 if (future == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800867 log.debug("null future for {} : {}", msg.getXid(), map);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800868 return Command.CONTINUE;
869 }
870
871 log.debug("Received BARRIER_REPLY : {}", msg);
872 future.deliverFuture(sw, msg);
873
874 return Command.CONTINUE;
875 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800876
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700877}