blob: 853591a2fa9c1c7cca93e2c51bbed5a2d8e77412 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080013import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080014import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080015import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070016
Naoki Shiota7d0cf272013-11-05 10:18:12 -080017import org.openflow.protocol.*;
18import org.openflow.protocol.action.*;
19import org.openflow.protocol.factory.BasicFactory;
20import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070022
23import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.IFloodlightProviderService;
25import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070026import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080027import net.floodlightcontroller.core.internal.OFMessageFuture;
28import net.floodlightcontroller.core.module.FloodlightModuleContext;
29import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080030import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080031import net.floodlightcontroller.util.OFMessageDamper;
Pavlin Radoslavovda0ab442013-12-04 14:08:58 -080032import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080033import net.onrc.onos.ofcontroller.util.FlowEntryAction;
34import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080035import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080036import net.onrc.onos.ofcontroller.util.FlowEntryActions;
37import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080038import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
39import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080040import net.onrc.onos.ofcontroller.util.IPv4Net;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080041import net.onrc.onos.ofcontroller.util.Pair;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080042import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070043
44/**
Naoki Shiotab485d412013-11-26 12:04:19 -080045 * FlowPusher is a implementation of FlowPusherService.
46 * FlowPusher assigns one message queue instance for each one switch.
47 * Number of message processing threads is configurable by constructor, and
48 * one thread can handle multiple message queues. Each queue will be assigned to
49 * a thread according to hash function defined by getHash().
50 * Each processing thread reads messages from queues and sends it to switches
51 * in round-robin. Processing thread also calculates rate of sending to suppress
52 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070053 * @author Naoki Shiota
54 *
55 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080056public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080057 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
Pavlin Radoslavovda0ab442013-12-04 14:08:58 -080058 protected volatile IFlowService flowManager;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080059
60 // NOTE: Below are moved from FlowManager.
61 // TODO: Values copied from elsewhere (class LearningSwitch).
62 // The local copy should go away!
63 //
64 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
65 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080066
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080067 // Number of messages sent to switch at once
68 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080069
70 public static final short PRIORITY_DEFAULT = 100;
71 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
72 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
73
74 public enum QueueState {
75 READY,
76 SUSPENDED,
77 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070078
Naoki Shiotac1601d32013-11-20 10:47:34 -080079 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080080 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080081 * This consists of queue itself and variables used for limiting sending rate.
82 * @author Naoki Shiota
83 *
84 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080085 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080086 private class SwitchQueue extends ArrayDeque<OFMessage> {
87 QueueState state;
88
Naoki Shiotae3199732013-11-25 16:14:43 -080089 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080090 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070091 long last_sent_time = 0;
92 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080093
Naoki Shiotae3199732013-11-25 16:14:43 -080094 // "To be deleted" flag
95 boolean toBeDeleted = false;
96
Naoki Shiota7d0cf272013-11-05 10:18:12 -080097 /**
98 * Check if sending rate is within the rate
99 * @param current Current time
100 * @return true if within the rate
101 */
102 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800103 if (max_rate == 0) {
104 // no limitation
105 return true;
106 }
107
Naoki Shiota81dbe302013-11-21 15:35:38 -0800108 if (current == last_sent_time) {
109 return false;
110 }
111
Naoki Shiotac1601d32013-11-20 10:47:34 -0800112 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800113 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800114 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800115 }
116
Naoki Shiota81dbe302013-11-21 15:35:38 -0800117 /**
118 * Log time and size of last sent data.
119 * @param current Time to be sent.
120 * @param size Size of sent data (in bytes).
121 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800122 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800123 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800124 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800125 }
126
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700127 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800128
Naoki Shiotac1601d32013-11-20 10:47:34 -0800129 private OFMessageDamper messageDamper = null;
130 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700131
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800132 private FloodlightContext context = null;
133 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800134
135 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800136 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800137 // Map of Future objects versus dpid and transaction ID.
Naoki Shiotac1601d32013-11-20 10:47:34 -0800138 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
139 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800140
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800141 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800142
Naoki Shiota8739faa2013-11-18 17:00:25 -0800143 /**
144 * Main thread that reads messages from queues and sends them to switches.
145 * @author Naoki Shiota
146 *
147 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800148 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800149 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotab485d412013-11-26 12:04:19 -0800150 = new HashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800151
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800152 // reusable latch used for waiting for arrival of message
Naoki Shiota81dbe302013-11-21 15:35:38 -0800153 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800154
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700155 @Override
156 public void run() {
157 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800158 try {
159 // wait for message pushed to queue
160 mutex.acquire();
161 } catch (InterruptedException e) {
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800162 // not an error
Naoki Shiota81dbe302013-11-21 15:35:38 -0800163 log.debug("FlowPusherThread is interrupted");
164 return;
165 }
166
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800167 // for safety of concurrent access, copy all key objects
168 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800169 synchronized (queues) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800170 for (IOFSwitch sw : queues.keySet()) {
171 keys.add(sw);
172 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800173 }
174
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800175 for (IOFSwitch sw : keys) {
176 SwitchQueue queue = queues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800177
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700178 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800179 if (sw == null || queue == null ||
180 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700181 continue;
182 }
183
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800184 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800185 processQueue(sw, queue, MAX_MESSAGE_SEND);
186 if (queue.isEmpty()) {
187 // remove queue if flagged to be.
188 if (queue.toBeDeleted) {
189 synchronized (queues) {
190 queues.remove(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800191 }
192 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800193 } else {
194 // if some messages remains in queue, latch down
195 if (mutex.availablePermits() == 0) {
196 mutex.release();
Naoki Shiota81dbe302013-11-21 15:35:38 -0800197 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700198 }
199 }
200 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700201 }
202 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800203
204 /**
205 * Read messages from queue and send them to the switch.
206 * If number of messages excess the limit, stop sending messages.
207 * @param sw Switch to which messages will be sent.
208 * @param queue Queue of messages.
209 * @param max_msg Limitation of number of messages to be sent. If set to 0,
210 * all messages in queue will be sent.
211 */
212 private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
213 // check sending rate and determine it to be sent or not
214 long current_time = System.currentTimeMillis();
215 long size = 0;
216
217 if (queue.isSendable(current_time)) {
218 int i = 0;
219 while (! queue.isEmpty()) {
220 // Number of messages excess the limit
221 if (0 < max_msg && max_msg <= i) {
222 break;
223 }
224 ++i;
225
226 OFMessage msg = queue.poll();
227 try {
228 messageDamper.write(sw, msg, context);
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800229// log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800230 size += msg.getLength();
231 } catch (IOException e) {
232 e.printStackTrace();
233 log.error("Exception in sending message ({}) : {}", msg, e);
234 }
235 }
236 sw.flush();
237 queue.logSentData(current_time, size);
238 }
239 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700240 }
241
Naoki Shiotac1601d32013-11-20 10:47:34 -0800242 /**
243 * Initialize object with one thread.
244 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800245 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800246 }
247
Naoki Shiotac1601d32013-11-20 10:47:34 -0800248 /**
249 * Initialize object with threads of given number.
250 * @param number_thread Number of threads to handle messages.
251 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800252 public FlowPusher(int number_thread) {
253 this.number_thread = number_thread;
254 }
255
Naoki Shiotac1601d32013-11-20 10:47:34 -0800256 /**
257 * Set parameters needed for sending messages.
258 * @param context FloodlightContext used for sending messages.
259 * If null, FlowPusher uses default context.
260 * @param modContext FloodlightModuleContext used for acquiring
261 * ThreadPoolService and registering MessageListener.
262 * @param factory Factory object to create OFMessage objects.
263 * @param damper Message damper used for sending messages.
264 * If null, FlowPusher creates its own damper object.
265 */
266 public void init(FloodlightContext context,
267 FloodlightModuleContext modContext,
268 BasicFactory factory,
269 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700270 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800271 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800272 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
273 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
274 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Pavlin Radoslavovda0ab442013-12-04 14:08:58 -0800275 flowManager = modContext.getServiceImpl(IFlowService.class);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800276
277 if (damper != null) {
278 messageDamper = damper;
279 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800280 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800281 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
282 EnumSet.of(OFType.FLOW_MOD),
283 OFMESSAGE_DAMPER_TIMEOUT);
284 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700285 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800286
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800287 /**
288 * Begin processing queue.
289 */
290 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800291 if (factory == null) {
292 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800293 return;
294 }
295
Naoki Shiota81dbe302013-11-21 15:35:38 -0800296 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800297 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800298 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800299
Naoki Shiota81dbe302013-11-21 15:35:38 -0800300 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800301 thread.start();
302 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700303 }
304
Brian O'Connor8c166a72013-11-14 18:41:48 -0800305 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800306 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800307 SwitchQueue queue = getQueue(sw);
308
309 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800310 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800311 }
312
313 synchronized (queue) {
314 if (queue.state == QueueState.READY) {
315 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800316 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800317 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800318 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800319 }
320 }
321
Brian O'Connor8c166a72013-11-14 18:41:48 -0800322 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800323 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800324 SwitchQueue queue = getQueue(sw);
325
326 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800327 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800328 }
329
330 synchronized (queue) {
331 if (queue.state == QueueState.SUSPENDED) {
332 queue.state = QueueState.READY;
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800333
334 // Latch down if queue is not empty
335 FlowPusherThread thread = getProcess(sw);
336 if (! queue.isEmpty() &&
337 thread.mutex.availablePermits() == 0) {
338 thread.mutex.release();
339 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800340 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800341 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800342 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800343 }
344 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800345
Brian O'Connor8c166a72013-11-14 18:41:48 -0800346 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800347 public boolean isSuspended(IOFSwitch sw) {
348 SwitchQueue queue = getQueue(sw);
349
350 if (queue == null) {
351 // TODO Is true suitable for this case?
352 return true;
353 }
354
355 return (queue.state == QueueState.SUSPENDED);
356 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800357
358 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800359 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800360 */
361 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800362 if (threadMap == null) {
363 return;
364 }
365
Naoki Shiota81dbe302013-11-21 15:35:38 -0800366 for (FlowPusherThread t : threadMap.values()) {
367 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800368 }
369 }
370
Naoki Shiotae3199732013-11-25 16:14:43 -0800371 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800372 public void setRate(IOFSwitch sw, long rate) {
373 SwitchQueue queue = getQueue(sw);
374 if (queue == null) {
375 return;
376 }
377
378 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800379 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800380 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700381 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700382 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800383
384 @Override
385 public boolean createQueue(IOFSwitch sw) {
386 SwitchQueue queue = getQueue(sw);
387 if (queue != null) {
388 return false;
389 }
390
391 FlowPusherThread proc = getProcess(sw);
392 queue = new SwitchQueue();
393 queue.state = QueueState.READY;
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800394 synchronized (proc.queues) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800395 proc.queues.put(sw, queue);
396 }
397
398 return true;
399 }
400
401 @Override
402 public boolean deleteQueue(IOFSwitch sw) {
403 return deleteQueue(sw, false);
404 }
405
406 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800407 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800408 FlowPusherThread proc = getProcess(sw);
409
Naoki Shiotab485d412013-11-26 12:04:19 -0800410 if (forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800411 synchronized (proc.queues) {
412 SwitchQueue queue = proc.queues.remove(sw);
413 if (queue == null) {
414 return false;
415 }
416 }
417 return true;
418 } else {
419 SwitchQueue queue = getQueue(sw);
420 if (queue == null) {
421 return false;
422 }
423 synchronized (queue) {
424 queue.toBeDeleted = true;
425 }
426 return true;
427 }
428 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700429
Brian O'Connor8c166a72013-11-14 18:41:48 -0800430 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800431 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800432 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800433 SwitchQueue queue = proc.queues.get(sw);
434
Naoki Shiotab485d412013-11-26 12:04:19 -0800435 // create queue at first addition of message
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800436 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800437 createQueue(sw);
438 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800439 }
440
441 synchronized (queue) {
442 queue.add(msg);
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800443// log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800444 }
445
Naoki Shiota81dbe302013-11-21 15:35:38 -0800446 if (proc.mutex.availablePermits() == 0) {
447 proc.mutex.release();
448 }
449
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800450 return true;
451 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800452
Brian O'Connor8c166a72013-11-14 18:41:48 -0800453 @Override
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800454 public void pushFlowEntries(
455 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
456
457 List<Pair<IOFSwitch, FlowEntry>> pushedEntries =
458 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
459
460 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
461 if (add(entry.left, entry.right)) {
462 pushedEntries.add(entry);
463 }
464 }
465
466 //
467 // TODO: We should use the OpenFlow Barrier mechanism
468 // to check for errors, and update the SwitchState
469 // for a flow entry after the Barrier message is
470 // is received.
471 // Only after inform the Flow Manager that the entry is pushed.
472 //
473 flowManager.flowEntriesPushedToSwitch(pushedEntries);
474 }
475
476 @Override
477 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
478 Collection<Pair<IOFSwitch, FlowEntry>> entries =
479 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
480
481 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
482 pushFlowEntries(entries);
483 }
484
485 /**
486 * Create a message from FlowEntry and add it to the queue of the switch.
487 * @param sw Switch to which message is pushed.
488 * @param flowEntry FlowEntry object used for creating message.
489 * @return true if message is successfully added to a queue.
490 */
491 private boolean add(IOFSwitch sw, FlowEntry flowEntry) {
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800492 //
493 // Create the OpenFlow Flow Modification Entry to push
494 //
495 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
496 long cookie = flowEntry.flowEntryId().value();
497
498 short flowModCommand = OFFlowMod.OFPFC_ADD;
499 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
500 flowModCommand = OFFlowMod.OFPFC_ADD;
501 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
502 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
503 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
504 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
505 } else {
506 // Unknown user state. Ignore the entry
507 log.debug(
508 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
509 flowEntry.flowEntryId().toString(),
510 flowEntry.flowEntryUserState());
511 return false;
512 }
513
514 //
515 // Fetch the match conditions.
516 //
517 // NOTE: The Flow matching conditions common for all Flow Entries are
518 // used ONLY if a Flow Entry does NOT have the corresponding matching
519 // condition set.
520 //
521 OFMatch match = new OFMatch();
522 match.setWildcards(OFMatch.OFPFW_ALL);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800523 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
524
525 // Match the Incoming Port
526 Port matchInPort = flowEntryMatch.inPort();
527 if (matchInPort != null) {
528 match.setInputPort(matchInPort.value());
529 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
530 }
531
532 // Match the Source MAC address
533 MACAddress matchSrcMac = flowEntryMatch.srcMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800534 if (matchSrcMac != null) {
535 match.setDataLayerSource(matchSrcMac.toString());
536 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
537 }
538
539 // Match the Destination MAC address
540 MACAddress matchDstMac = flowEntryMatch.dstMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800541 if (matchDstMac != null) {
542 match.setDataLayerDestination(matchDstMac.toString());
543 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
544 }
545
546 // Match the Ethernet Frame Type
547 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800548 if (matchEthernetFrameType != null) {
549 match.setDataLayerType(matchEthernetFrameType);
550 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
551 }
552
553 // Match the VLAN ID
554 Short matchVlanId = flowEntryMatch.vlanId();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800555 if (matchVlanId != null) {
556 match.setDataLayerVirtualLan(matchVlanId);
557 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
558 }
559
560 // Match the VLAN priority
561 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800562 if (matchVlanPriority != null) {
563 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
564 match.setWildcards(match.getWildcards()
565 & ~OFMatch.OFPFW_DL_VLAN_PCP);
566 }
567
568 // Match the Source IPv4 Network prefix
569 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800570 if (matchSrcIPv4Net != null) {
571 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
572 }
573
574 // Natch the Destination IPv4 Network prefix
575 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800576 if (matchDstIPv4Net != null) {
577 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
578 }
579
580 // Match the IP protocol
581 Byte matchIpProto = flowEntryMatch.ipProto();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800582 if (matchIpProto != null) {
583 match.setNetworkProtocol(matchIpProto);
584 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
585 }
586
587 // Match the IP ToS (DSCP field, 6 bits)
588 Byte matchIpToS = flowEntryMatch.ipToS();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800589 if (matchIpToS != null) {
590 match.setNetworkTypeOfService(matchIpToS);
591 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
592 }
593
594 // Match the Source TCP/UDP port
595 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800596 if (matchSrcTcpUdpPort != null) {
597 match.setTransportSource(matchSrcTcpUdpPort);
598 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
599 }
600
601 // Match the Destination TCP/UDP port
602 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800603 if (matchDstTcpUdpPort != null) {
604 match.setTransportDestination(matchDstTcpUdpPort);
605 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
606 }
607
608 //
609 // Fetch the actions
610 //
611 Short actionOutputPort = null;
612 List<OFAction> openFlowActions = new ArrayList<OFAction>();
613 int actionsLen = 0;
614 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
615 //
616 for (FlowEntryAction action : flowEntryActions.actions()) {
617 ActionOutput actionOutput = action.actionOutput();
618 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
619 ActionSetVlanPriority actionSetVlanPriority = action
620 .actionSetVlanPriority();
621 ActionStripVlan actionStripVlan = action.actionStripVlan();
622 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
623 .actionSetEthernetSrcAddr();
624 ActionSetEthernetAddr actionSetEthernetDstAddr = action
625 .actionSetEthernetDstAddr();
626 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
627 .actionSetIPv4SrcAddr();
628 ActionSetIPv4Addr actionSetIPv4DstAddr = action
629 .actionSetIPv4DstAddr();
630 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
631 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
632 .actionSetTcpUdpSrcPort();
633 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
634 .actionSetTcpUdpDstPort();
635 ActionEnqueue actionEnqueue = action.actionEnqueue();
636
637 if (actionOutput != null) {
638 actionOutputPort = actionOutput.port().value();
639 // XXX: The max length is hard-coded for now
640 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
641 .value(), (short) 0xffff);
642 openFlowActions.add(ofa);
643 actionsLen += ofa.getLength();
644 }
645
646 if (actionSetVlanId != null) {
647 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
648 actionSetVlanId.vlanId());
649 openFlowActions.add(ofa);
650 actionsLen += ofa.getLength();
651 }
652
653 if (actionSetVlanPriority != null) {
654 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
655 actionSetVlanPriority.vlanPriority());
656 openFlowActions.add(ofa);
657 actionsLen += ofa.getLength();
658 }
659
660 if (actionStripVlan != null) {
661 if (actionStripVlan.stripVlan() == true) {
662 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
663 openFlowActions.add(ofa);
664 actionsLen += ofa.getLength();
665 }
666 }
667
668 if (actionSetEthernetSrcAddr != null) {
669 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
670 actionSetEthernetSrcAddr.addr().toBytes());
671 openFlowActions.add(ofa);
672 actionsLen += ofa.getLength();
673 }
674
675 if (actionSetEthernetDstAddr != null) {
676 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
677 actionSetEthernetDstAddr.addr().toBytes());
678 openFlowActions.add(ofa);
679 actionsLen += ofa.getLength();
680 }
681
682 if (actionSetIPv4SrcAddr != null) {
683 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
684 actionSetIPv4SrcAddr.addr().value());
685 openFlowActions.add(ofa);
686 actionsLen += ofa.getLength();
687 }
688
689 if (actionSetIPv4DstAddr != null) {
690 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
691 actionSetIPv4DstAddr.addr().value());
692 openFlowActions.add(ofa);
693 actionsLen += ofa.getLength();
694 }
695
696 if (actionSetIpToS != null) {
697 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
698 actionSetIpToS.ipToS());
699 openFlowActions.add(ofa);
700 actionsLen += ofa.getLength();
701 }
702
703 if (actionSetTcpUdpSrcPort != null) {
704 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
705 actionSetTcpUdpSrcPort.port());
706 openFlowActions.add(ofa);
707 actionsLen += ofa.getLength();
708 }
709
710 if (actionSetTcpUdpDstPort != null) {
711 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
712 actionSetTcpUdpDstPort.port());
713 openFlowActions.add(ofa);
714 actionsLen += ofa.getLength();
715 }
716
717 if (actionEnqueue != null) {
718 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
719 .value(), actionEnqueue.queueId());
720 openFlowActions.add(ofa);
721 actionsLen += ofa.getLength();
722 }
723 }
724
725 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
726 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
727 .setPriority(PRIORITY_DEFAULT)
728 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
729 .setCommand(flowModCommand).setMatch(match)
730 .setActions(openFlowActions)
731 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
732 fm.setOutPort(OFPort.OFPP_NONE.getValue());
733 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
734 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
735 if (actionOutputPort != null)
736 fm.setOutPort(actionOutputPort);
737 }
738
739 //
740 // TODO: Set the following flag
741 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
742 // See method ForwardingBase::pushRoute()
743 //
744
745 //
746 // Write the message to the switch
747 //
Pavlin Radoslavovf0678902013-12-03 15:06:56 -0800748 log.debug("Installing flow entry "
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800749 + flowEntry.flowEntryUserState() + " into switch DPID: "
750 + sw.getStringId() + " flowEntryId: "
751 + flowEntry.flowEntryId().toString() + " srcMac: "
752 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
753 + matchInPort + " outPort: " + actionOutputPort);
754
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800755 return add(sw, fm);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800756 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800757
758 @Override
759 public OFBarrierReply barrier(IOFSwitch sw) {
760 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
761 if (future == null) {
762 return null;
763 }
764
765 try {
766 return future.get();
767 } catch (InterruptedException e) {
768 e.printStackTrace();
769 log.error("InterruptedException: {}", e);
770 return null;
771 } catch (ExecutionException e) {
772 e.printStackTrace();
773 log.error("ExecutionException: {}", e);
774 return null;
775 }
776 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800777
Naoki Shiotac1601d32013-11-20 10:47:34 -0800778 @Override
779 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
780 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -0800781
782 if (sw == null) {
783 return null;
784 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800785
Naoki Shiota81dbe302013-11-21 15:35:38 -0800786 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800787 msg.setXid(sw.getNextTransactionId());
Naoki Shiotac1601d32013-11-20 10:47:34 -0800788
Naoki Shiotac1601d32013-11-20 10:47:34 -0800789 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiotac1601d32013-11-20 10:47:34 -0800790 synchronized (barrierFutures) {
791 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
792 if (map == null) {
793 map = new HashMap<Integer,OFBarrierReplyFuture>();
794 barrierFutures.put(sw.getId(), map);
795 }
796 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800797 }
798
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800799 add(sw, msg);
800
Naoki Shiotac1601d32013-11-20 10:47:34 -0800801 return future;
802 }
803
Naoki Shiotae3199732013-11-25 16:14:43 -0800804 /**
805 * Get a queue attached to a switch.
806 * @param sw Switch object
807 * @return Queue object
808 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800809 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800810 if (sw == null) {
811 return null;
812 }
813
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800814 return getProcess(sw).queues.get(sw);
815 }
816
Naoki Shiotae3199732013-11-25 16:14:43 -0800817 /**
818 * Get a hash value correspondent to a switch.
819 * @param sw Switch object
820 * @return Hash value
821 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800822 protected long getHash(IOFSwitch sw) {
823 // This code assumes DPID is sequentially assigned.
824 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800825 return sw.getId() % number_thread;
826 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800827
828 /**
829 * Get a Thread object which processes the queue attached to a switch.
830 * @param sw Switch object
831 * @return Thread object
832 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800833 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800834 long hash = getHash(sw);
835
836 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800837 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800838
839 @Override
840 public String getName() {
841 return "flowpusher";
842 }
843
844 @Override
845 public boolean isCallbackOrderingPrereq(OFType type, String name) {
846 return false;
847 }
848
849 @Override
850 public boolean isCallbackOrderingPostreq(OFType type, String name) {
851 return false;
852 }
853
854 @Override
855 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -0800856 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
857 if (map == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800858 log.debug("null map for {} : {}", sw.getId(), barrierFutures);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800859 return Command.CONTINUE;
860 }
861
862 OFBarrierReplyFuture future = map.get(msg.getXid());
863 if (future == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800864 log.debug("null future for {} : {}", msg.getXid(), map);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800865 return Command.CONTINUE;
866 }
867
868 log.debug("Received BARRIER_REPLY : {}", msg);
869 future.deliverFuture(sw, msg);
870
871 return Command.CONTINUE;
872 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800873
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700874}