blob: ba3602ce1254de5c5624546d58a3545fc0ef32ff [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08008import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080010import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080011import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080012import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070013
Naoki Shiota7d0cf272013-11-05 10:18:12 -080014import org.openflow.protocol.*;
15import org.openflow.protocol.action.*;
16import org.openflow.protocol.factory.BasicFactory;
17import org.slf4j.Logger;
18import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
20import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080027import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080028import net.floodlightcontroller.util.OFMessageDamper;
29import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
30import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
31import net.onrc.onos.ofcontroller.util.FlowEntryAction;
32import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080033import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080034import net.onrc.onos.ofcontroller.util.FlowEntryActions;
35import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080036import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
37import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
38import net.onrc.onos.ofcontroller.util.FlowPath;
39import net.onrc.onos.ofcontroller.util.IPv4Net;
40import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070041
42/**
Naoki Shiota8739faa2013-11-18 17:00:25 -080043 * FlowPusher intermediates FlowManager/FlowSynchronizer and switches to push OpenFlow
44 * messages to switches in proper rate.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070045 * @author Naoki Shiota
46 *
47 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080048public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080049 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
50
Naoki Shiotae3199732013-11-25 16:14:43 -080051 // If this flag is true, barrier message will be sent when queue goes empty.
Naoki Shiota81dbe302013-11-21 15:35:38 -080052 private static boolean barrierIfEmpty = false;
53
Naoki Shiota7d0cf272013-11-05 10:18:12 -080054 // NOTE: Below are moved from FlowManager.
55 // TODO: Values copied from elsewhere (class LearningSwitch).
56 // The local copy should go away!
57 //
58 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
59 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080060
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080061 // Number of messages sent to switch at once
62 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080063
64 public static final short PRIORITY_DEFAULT = 100;
65 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
66 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
67
68 public enum QueueState {
69 READY,
70 SUSPENDED,
71 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070072
Naoki Shiotac1601d32013-11-20 10:47:34 -080073 /**
74 * Message queue attached to a switch.
75 * This consists of queue itself and variables used for limiting sending rate.
76 * @author Naoki Shiota
77 *
78 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080079 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080080 private class SwitchQueue extends ArrayDeque<OFMessage> {
81 QueueState state;
82
Naoki Shiotae3199732013-11-25 16:14:43 -080083 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080084 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070085 long last_sent_time = 0;
86 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080087
Naoki Shiotae3199732013-11-25 16:14:43 -080088 // "To be deleted" flag
89 boolean toBeDeleted = false;
90
Naoki Shiota7d0cf272013-11-05 10:18:12 -080091 /**
92 * Check if sending rate is within the rate
93 * @param current Current time
94 * @return true if within the rate
95 */
96 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -080097 if (max_rate == 0) {
98 // no limitation
99 return true;
100 }
101
Naoki Shiota81dbe302013-11-21 15:35:38 -0800102 if (current == last_sent_time) {
103 return false;
104 }
105
Naoki Shiotac1601d32013-11-20 10:47:34 -0800106 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotae3199732013-11-25 16:14:43 -0800107 long interval = (current - last_sent_time) / 1000000;
108 long rate = last_sent_size / interval;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800109 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800110 }
111
Naoki Shiota81dbe302013-11-21 15:35:38 -0800112 /**
113 * Log time and size of last sent data.
114 * @param current Time to be sent.
115 * @param size Size of sent data (in bytes).
116 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800117 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800118 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800119 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800120 }
121
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700122 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800123
Naoki Shiotac1601d32013-11-20 10:47:34 -0800124 private OFMessageDamper messageDamper = null;
125 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700126
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800127 private FloodlightContext context = null;
128 private BasicFactory factory = null;
Naoki Shiota81dbe302013-11-21 15:35:38 -0800129 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800130 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
131 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800132
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800133 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800134
Naoki Shiota8739faa2013-11-18 17:00:25 -0800135 /**
136 * Main thread that reads messages from queues and sends them to switches.
137 * @author Naoki Shiota
138 *
139 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800140 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800141 private Map<IOFSwitch,SwitchQueue> queues
142 = new HashMap<IOFSwitch,SwitchQueue>();
143
Naoki Shiota81dbe302013-11-21 15:35:38 -0800144 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800145
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700146 @Override
147 public void run() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800148 log.debug("Begin Flow Pusher Process");
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700149
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700150 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800151 try {
152 // wait for message pushed to queue
153 mutex.acquire();
154 } catch (InterruptedException e) {
155 e.printStackTrace();
156 log.debug("FlowPusherThread is interrupted");
157 return;
158 }
159
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800160 Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
161 synchronized (queues) {
162 entries = queues.entrySet();
163 }
164
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800165 for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800166 IOFSwitch sw = entry.getKey();
167 SwitchQueue queue = entry.getValue();
168
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700169 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800170 if (sw == null || queue == null ||
171 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700172 continue;
173 }
174
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800175 // check sending rate and determine it to be sent or not
176 long current_time = System.nanoTime();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800177 long size = 0;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800178
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800179 synchronized (queue) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800180 if (queue.isSendable(current_time)) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800181 int i = 0;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800182 while (! queue.isEmpty()) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800183 // Number of messages excess the limit
Naoki Shiota81dbe302013-11-21 15:35:38 -0800184 if (i >= MAX_MESSAGE_SEND) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800185 // Messages remains in queue
Naoki Shiota81dbe302013-11-21 15:35:38 -0800186 mutex.release();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800187 break;
188 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800189 ++i;
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800190
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800191 OFMessage msg = queue.poll();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800192 try {
193 messageDamper.write(sw, msg, context);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800194 log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800195 size += msg.getLength();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800196 } catch (IOException e) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800197 e.printStackTrace();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800198 log.error("Exception in sending message ({}) : {}", msg, e);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800199 }
200 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800201 sw.flush();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800202 queue.logSentData(current_time, size);
Naoki Shiota81dbe302013-11-21 15:35:38 -0800203
Naoki Shiotae3199732013-11-25 16:14:43 -0800204 if (queue.isEmpty()) {
205 if (barrierIfEmpty) {
206 barrier(sw);
207 }
208 if (queue.toBeDeleted) {
209 synchronized (queues) {
210 queues.remove(sw);
211 }
212 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800213 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700214 }
215 }
216 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700217 }
218 }
219 }
220
Naoki Shiotac1601d32013-11-20 10:47:34 -0800221 /**
222 * Initialize object with one thread.
223 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800224 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800225 }
226
Naoki Shiotac1601d32013-11-20 10:47:34 -0800227 /**
228 * Initialize object with threads of given number.
229 * @param number_thread Number of threads to handle messages.
230 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800231 public FlowPusher(int number_thread) {
232 this.number_thread = number_thread;
233 }
234
Naoki Shiotac1601d32013-11-20 10:47:34 -0800235 /**
236 * Set parameters needed for sending messages.
237 * @param context FloodlightContext used for sending messages.
238 * If null, FlowPusher uses default context.
239 * @param modContext FloodlightModuleContext used for acquiring
240 * ThreadPoolService and registering MessageListener.
241 * @param factory Factory object to create OFMessage objects.
242 * @param damper Message damper used for sending messages.
243 * If null, FlowPusher creates its own damper object.
244 */
245 public void init(FloodlightContext context,
246 FloodlightModuleContext modContext,
247 BasicFactory factory,
248 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700249 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800250 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800251 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
252 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
253 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800254
255 if (damper != null) {
256 messageDamper = damper;
257 } else {
258 // use default value
259 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
260 EnumSet.of(OFType.FLOW_MOD),
261 OFMESSAGE_DAMPER_TIMEOUT);
262 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700263 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800264
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800265 /**
266 * Begin processing queue.
267 */
268 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800269 if (factory == null) {
270 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800271 return;
272 }
273
Naoki Shiota81dbe302013-11-21 15:35:38 -0800274 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800275 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800276 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800277
Naoki Shiota81dbe302013-11-21 15:35:38 -0800278 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800279 thread.start();
280 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700281 }
282
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800283 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800284 * Suspend sending messages to switch.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800285 * @param sw
286 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800287 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800288 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800289 SwitchQueue queue = getQueue(sw);
290
291 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800292 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800293 }
294
295 synchronized (queue) {
296 if (queue.state == QueueState.READY) {
297 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800298 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800299 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800300 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800301 }
302 }
303
304 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800305 * Resume sending messages to switch.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800306 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800307 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800308 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800309 SwitchQueue queue = getQueue(sw);
310
311 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800312 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800313 }
314
315 synchronized (queue) {
316 if (queue.state == QueueState.SUSPENDED) {
317 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800318 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800319 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800320 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800321 }
322 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800323
Naoki Shiota8739faa2013-11-18 17:00:25 -0800324 /**
325 * Check if given switch is suspended.
326 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800327 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800328 public boolean isSuspended(IOFSwitch sw) {
329 SwitchQueue queue = getQueue(sw);
330
331 if (queue == null) {
332 // TODO Is true suitable for this case?
333 return true;
334 }
335
336 return (queue.state == QueueState.SUSPENDED);
337 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800338
339 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800340 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800341 */
342 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800343 if (threadMap == null) {
344 return;
345 }
346
Naoki Shiota81dbe302013-11-21 15:35:38 -0800347 for (FlowPusherThread t : threadMap.values()) {
348 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800349 }
350 }
351
Naoki Shiotae3199732013-11-25 16:14:43 -0800352 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800353 public void setRate(IOFSwitch sw, long rate) {
354 SwitchQueue queue = getQueue(sw);
355 if (queue == null) {
356 return;
357 }
358
359 if (rate > 0) {
360 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700361 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700362 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800363
364 @Override
365 public boolean createQueue(IOFSwitch sw) {
366 SwitchQueue queue = getQueue(sw);
367 if (queue != null) {
368 return false;
369 }
370
371 FlowPusherThread proc = getProcess(sw);
372 queue = new SwitchQueue();
373 queue.state = QueueState.READY;
374 synchronized (proc) {
375 proc.queues.put(sw, queue);
376 }
377
378 return true;
379 }
380
381 @Override
382 public boolean deleteQueue(IOFSwitch sw) {
383 return deleteQueue(sw, false);
384 }
385
386 @Override
387 public boolean deleteQueue(IOFSwitch sw, boolean force) {
388 FlowPusherThread proc = getProcess(sw);
389
390 if (force) {
391 synchronized (proc.queues) {
392 SwitchQueue queue = proc.queues.remove(sw);
393 if (queue == null) {
394 return false;
395 }
396 }
397 return true;
398 } else {
399 SwitchQueue queue = getQueue(sw);
400 if (queue == null) {
401 return false;
402 }
403 synchronized (queue) {
404 queue.toBeDeleted = true;
405 }
406 return true;
407 }
408 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700409
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800410 /**
Naoki Shiotac1601d32013-11-20 10:47:34 -0800411 * Add OFMessage to queue of the switch.
Naoki Shiota8739faa2013-11-18 17:00:25 -0800412 * @param sw Switch to which message is sent.
413 * @param msg Message to be sent.
414 * @return true if succeed.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800415 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800416 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800417 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800418 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800419 SwitchQueue queue = proc.queues.get(sw);
420
Naoki Shiotae3199732013-11-25 16:14:43 -0800421 // create queue at first call
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800422 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800423 createQueue(sw);
424 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800425 }
426
427 synchronized (queue) {
428 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800429 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800430 }
431
Naoki Shiota81dbe302013-11-21 15:35:38 -0800432 if (proc.mutex.availablePermits() == 0) {
433 proc.mutex.release();
434 }
435
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800436 return true;
437 }
438
439 /**
440 * Create OFMessage from given flow information and add it to the queue.
Naoki Shiota8739faa2013-11-18 17:00:25 -0800441 * @param sw Switch to which message is sent.
442 * @param flowObj FlowPath.
443 * @param flowEntryObj FlowEntry.
444 * @return true if succeed.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800445 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800446 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800447 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800448 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800449 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
450 if (flowEntryIdStr == null)
451 return false;
452 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
453 String userState = flowEntryObj.getUserState();
454 if (userState == null)
455 return false;
456
457 //
458 // Create the Open Flow Flow Modification Entry to push
459 //
460 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
461 long cookie = flowEntryId.value();
462
463 short flowModCommand = OFFlowMod.OFPFC_ADD;
464 if (userState.equals("FE_USER_ADD")) {
465 flowModCommand = OFFlowMod.OFPFC_ADD;
466 } else if (userState.equals("FE_USER_MODIFY")) {
467 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
468 } else if (userState.equals("FE_USER_DELETE")) {
469 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
470 } else {
471 // Unknown user state. Ignore the entry
472 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
473 flowEntryId.toString(), userState);
474 return false;
475 }
476
477 //
478 // Fetch the match conditions.
479 //
480 // NOTE: The Flow matching conditions common for all Flow Entries are
481 // used ONLY if a Flow Entry does NOT have the corresponding matching
482 // condition set.
483 //
484 OFMatch match = new OFMatch();
485 match.setWildcards(OFMatch.OFPFW_ALL);
486
487 // Match the Incoming Port
488 Short matchInPort = flowEntryObj.getMatchInPort();
489 if (matchInPort != null) {
490 match.setInputPort(matchInPort);
491 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
492 }
493
494 // Match the Source MAC address
495 String matchSrcMac = flowEntryObj.getMatchSrcMac();
496 if (matchSrcMac == null)
497 matchSrcMac = flowObj.getMatchSrcMac();
498 if (matchSrcMac != null) {
499 match.setDataLayerSource(matchSrcMac);
500 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
501 }
502
503 // Match the Destination MAC address
504 String matchDstMac = flowEntryObj.getMatchDstMac();
505 if (matchDstMac == null)
506 matchDstMac = flowObj.getMatchDstMac();
507 if (matchDstMac != null) {
508 match.setDataLayerDestination(matchDstMac);
509 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
510 }
511
512 // Match the Ethernet Frame Type
513 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
514 if (matchEthernetFrameType == null)
515 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
516 if (matchEthernetFrameType != null) {
517 match.setDataLayerType(matchEthernetFrameType);
518 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
519 }
520
521 // Match the VLAN ID
522 Short matchVlanId = flowEntryObj.getMatchVlanId();
523 if (matchVlanId == null)
524 matchVlanId = flowObj.getMatchVlanId();
525 if (matchVlanId != null) {
526 match.setDataLayerVirtualLan(matchVlanId);
527 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
528 }
529
530 // Match the VLAN priority
531 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
532 if (matchVlanPriority == null)
533 matchVlanPriority = flowObj.getMatchVlanPriority();
534 if (matchVlanPriority != null) {
535 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
536 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
537 }
538
539 // Match the Source IPv4 Network prefix
540 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
541 if (matchSrcIPv4Net == null)
542 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
543 if (matchSrcIPv4Net != null) {
544 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
545 }
546
547 // Match the Destination IPv4 Network prefix
548 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
549 if (matchDstIPv4Net == null)
550 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
551 if (matchDstIPv4Net != null) {
552 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
553 }
554
555 // Match the IP protocol
556 Byte matchIpProto = flowEntryObj.getMatchIpProto();
557 if (matchIpProto == null)
558 matchIpProto = flowObj.getMatchIpProto();
559 if (matchIpProto != null) {
560 match.setNetworkProtocol(matchIpProto);
561 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
562 }
563
564 // Match the IP ToS (DSCP field, 6 bits)
565 Byte matchIpToS = flowEntryObj.getMatchIpToS();
566 if (matchIpToS == null)
567 matchIpToS = flowObj.getMatchIpToS();
568 if (matchIpToS != null) {
569 match.setNetworkTypeOfService(matchIpToS);
570 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
571 }
572
573 // Match the Source TCP/UDP port
574 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
575 if (matchSrcTcpUdpPort == null)
576 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
577 if (matchSrcTcpUdpPort != null) {
578 match.setTransportSource(matchSrcTcpUdpPort);
579 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
580 }
581
582 // Match the Destination TCP/UDP port
583 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
584 if (matchDstTcpUdpPort == null)
585 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
586 if (matchDstTcpUdpPort != null) {
587 match.setTransportDestination(matchDstTcpUdpPort);
588 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
589 }
590
591 //
592 // Fetch the actions
593 //
594 Short actionOutputPort = null;
595 List<OFAction> openFlowActions = new ArrayList<OFAction>();
596 int actionsLen = 0;
597 FlowEntryActions flowEntryActions = null;
598 String actionsStr = flowEntryObj.getActions();
599 if (actionsStr != null)
600 flowEntryActions = new FlowEntryActions(actionsStr);
601 else
602 flowEntryActions = new FlowEntryActions();
603 for (FlowEntryAction action : flowEntryActions.actions()) {
604 ActionOutput actionOutput = action.actionOutput();
605 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
606 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
607 ActionStripVlan actionStripVlan = action.actionStripVlan();
608 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
609 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
610 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
611 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
612 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
613 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
614 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
615 ActionEnqueue actionEnqueue = action.actionEnqueue();
616
617 if (actionOutput != null) {
618 actionOutputPort = actionOutput.port().value();
619 // XXX: The max length is hard-coded for now
620 OFActionOutput ofa =
621 new OFActionOutput(actionOutput.port().value(),
622 (short)0xffff);
623 openFlowActions.add(ofa);
624 actionsLen += ofa.getLength();
625 }
626
627 if (actionSetVlanId != null) {
628 OFActionVirtualLanIdentifier ofa =
629 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
630 openFlowActions.add(ofa);
631 actionsLen += ofa.getLength();
632 }
633
634 if (actionSetVlanPriority != null) {
635 OFActionVirtualLanPriorityCodePoint ofa =
636 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
637 openFlowActions.add(ofa);
638 actionsLen += ofa.getLength();
639 }
640
641 if (actionStripVlan != null) {
642 if (actionStripVlan.stripVlan() == true) {
643 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
644 openFlowActions.add(ofa);
645 actionsLen += ofa.getLength();
646 }
647 }
648
649 if (actionSetEthernetSrcAddr != null) {
650 OFActionDataLayerSource ofa =
651 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
652 openFlowActions.add(ofa);
653 actionsLen += ofa.getLength();
654 }
655
656 if (actionSetEthernetDstAddr != null) {
657 OFActionDataLayerDestination ofa =
658 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
659 openFlowActions.add(ofa);
660 actionsLen += ofa.getLength();
661 }
662
663 if (actionSetIPv4SrcAddr != null) {
664 OFActionNetworkLayerSource ofa =
665 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
666 openFlowActions.add(ofa);
667 actionsLen += ofa.getLength();
668 }
669
670 if (actionSetIPv4DstAddr != null) {
671 OFActionNetworkLayerDestination ofa =
672 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
673 openFlowActions.add(ofa);
674 actionsLen += ofa.getLength();
675 }
676
677 if (actionSetIpToS != null) {
678 OFActionNetworkTypeOfService ofa =
679 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
680 openFlowActions.add(ofa);
681 actionsLen += ofa.getLength();
682 }
683
684 if (actionSetTcpUdpSrcPort != null) {
685 OFActionTransportLayerSource ofa =
686 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
687 openFlowActions.add(ofa);
688 actionsLen += ofa.getLength();
689 }
690
691 if (actionSetTcpUdpDstPort != null) {
692 OFActionTransportLayerDestination ofa =
693 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
694 openFlowActions.add(ofa);
695 actionsLen += ofa.getLength();
696 }
697
698 if (actionEnqueue != null) {
699 OFActionEnqueue ofa =
700 new OFActionEnqueue(actionEnqueue.port().value(),
701 actionEnqueue.queueId());
702 openFlowActions.add(ofa);
703 actionsLen += ofa.getLength();
704 }
705 }
706
707 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
708 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
709 .setPriority(PRIORITY_DEFAULT)
710 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
711 .setCookie(cookie)
712 .setCommand(flowModCommand)
713 .setMatch(match)
714 .setActions(openFlowActions)
715 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
716 fm.setOutPort(OFPort.OFPP_NONE.getValue());
717 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
718 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
719 if (actionOutputPort != null)
720 fm.setOutPort(actionOutputPort);
721 }
722
723 //
724 // TODO: Set the following flag
725 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
726 // See method ForwardingBase::pushRoute()
727 //
728
729 //
730 // Write the message to the switch
731 //
732 log.debug("MEASUREMENT: Installing flow entry " + userState +
733 " into switch DPID: " +
734 sw.getStringId() +
735 " flowEntryId: " + flowEntryId.toString() +
736 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
737 " inPort: " + matchInPort + " outPort: " + actionOutputPort
738 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800739 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800740 //
741 // TODO: We should use the OpenFlow Barrier mechanism
742 // to check for errors, and update the SwitchState
743 // for a flow entry after the Barrier message is
744 // is received.
745 //
746 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
747
748 return true;
749 }
750
Naoki Shiota8739faa2013-11-18 17:00:25 -0800751 /**
752 * Create OFMessage from given flow information and add it to the queue.
753 * @param sw Switch to which message is sent.
754 * @param flowPath FlowPath.
755 * @param flowEntry FlowEntry.
Naoki Shiotae3199732013-11-25 16:14:43 -0800756 * @return true if succeed.
Naoki Shiota8739faa2013-11-18 17:00:25 -0800757 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800758 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800759 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
760 //
761 // Create the OpenFlow Flow Modification Entry to push
762 //
763 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
764 long cookie = flowEntry.flowEntryId().value();
765
766 short flowModCommand = OFFlowMod.OFPFC_ADD;
767 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
768 flowModCommand = OFFlowMod.OFPFC_ADD;
769 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
770 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
771 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
772 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
773 } else {
774 // Unknown user state. Ignore the entry
775 log.debug(
776 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
777 flowEntry.flowEntryId().toString(),
778 flowEntry.flowEntryUserState());
779 return false;
780 }
781
782 //
783 // Fetch the match conditions.
784 //
785 // NOTE: The Flow matching conditions common for all Flow Entries are
786 // used ONLY if a Flow Entry does NOT have the corresponding matching
787 // condition set.
788 //
789 OFMatch match = new OFMatch();
790 match.setWildcards(OFMatch.OFPFW_ALL);
791 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
792 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
793
794 // Match the Incoming Port
795 Port matchInPort = flowEntryMatch.inPort();
796 if (matchInPort != null) {
797 match.setInputPort(matchInPort.value());
798 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
799 }
800
801 // Match the Source MAC address
802 MACAddress matchSrcMac = flowEntryMatch.srcMac();
803 if ((matchSrcMac == null) && (flowPathMatch != null)) {
804 matchSrcMac = flowPathMatch.srcMac();
805 }
806 if (matchSrcMac != null) {
807 match.setDataLayerSource(matchSrcMac.toString());
808 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
809 }
810
811 // Match the Destination MAC address
812 MACAddress matchDstMac = flowEntryMatch.dstMac();
813 if ((matchDstMac == null) && (flowPathMatch != null)) {
814 matchDstMac = flowPathMatch.dstMac();
815 }
816 if (matchDstMac != null) {
817 match.setDataLayerDestination(matchDstMac.toString());
818 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
819 }
820
821 // Match the Ethernet Frame Type
822 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
823 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
824 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
825 }
826 if (matchEthernetFrameType != null) {
827 match.setDataLayerType(matchEthernetFrameType);
828 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
829 }
830
831 // Match the VLAN ID
832 Short matchVlanId = flowEntryMatch.vlanId();
833 if ((matchVlanId == null) && (flowPathMatch != null)) {
834 matchVlanId = flowPathMatch.vlanId();
835 }
836 if (matchVlanId != null) {
837 match.setDataLayerVirtualLan(matchVlanId);
838 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
839 }
840
841 // Match the VLAN priority
842 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
843 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
844 matchVlanPriority = flowPathMatch.vlanPriority();
845 }
846 if (matchVlanPriority != null) {
847 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
848 match.setWildcards(match.getWildcards()
849 & ~OFMatch.OFPFW_DL_VLAN_PCP);
850 }
851
852 // Match the Source IPv4 Network prefix
853 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
854 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
855 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
856 }
857 if (matchSrcIPv4Net != null) {
858 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
859 }
860
861 // Natch the Destination IPv4 Network prefix
862 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
863 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
864 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
865 }
866 if (matchDstIPv4Net != null) {
867 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
868 }
869
870 // Match the IP protocol
871 Byte matchIpProto = flowEntryMatch.ipProto();
872 if ((matchIpProto == null) && (flowPathMatch != null)) {
873 matchIpProto = flowPathMatch.ipProto();
874 }
875 if (matchIpProto != null) {
876 match.setNetworkProtocol(matchIpProto);
877 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
878 }
879
880 // Match the IP ToS (DSCP field, 6 bits)
881 Byte matchIpToS = flowEntryMatch.ipToS();
882 if ((matchIpToS == null) && (flowPathMatch != null)) {
883 matchIpToS = flowPathMatch.ipToS();
884 }
885 if (matchIpToS != null) {
886 match.setNetworkTypeOfService(matchIpToS);
887 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
888 }
889
890 // Match the Source TCP/UDP port
891 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
892 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
893 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
894 }
895 if (matchSrcTcpUdpPort != null) {
896 match.setTransportSource(matchSrcTcpUdpPort);
897 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
898 }
899
900 // Match the Destination TCP/UDP port
901 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
902 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
903 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
904 }
905 if (matchDstTcpUdpPort != null) {
906 match.setTransportDestination(matchDstTcpUdpPort);
907 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
908 }
909
910 //
911 // Fetch the actions
912 //
913 Short actionOutputPort = null;
914 List<OFAction> openFlowActions = new ArrayList<OFAction>();
915 int actionsLen = 0;
916 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
917 //
918 for (FlowEntryAction action : flowEntryActions.actions()) {
919 ActionOutput actionOutput = action.actionOutput();
920 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
921 ActionSetVlanPriority actionSetVlanPriority = action
922 .actionSetVlanPriority();
923 ActionStripVlan actionStripVlan = action.actionStripVlan();
924 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
925 .actionSetEthernetSrcAddr();
926 ActionSetEthernetAddr actionSetEthernetDstAddr = action
927 .actionSetEthernetDstAddr();
928 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
929 .actionSetIPv4SrcAddr();
930 ActionSetIPv4Addr actionSetIPv4DstAddr = action
931 .actionSetIPv4DstAddr();
932 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
933 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
934 .actionSetTcpUdpSrcPort();
935 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
936 .actionSetTcpUdpDstPort();
937 ActionEnqueue actionEnqueue = action.actionEnqueue();
938
939 if (actionOutput != null) {
940 actionOutputPort = actionOutput.port().value();
941 // XXX: The max length is hard-coded for now
942 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
943 .value(), (short) 0xffff);
944 openFlowActions.add(ofa);
945 actionsLen += ofa.getLength();
946 }
947
948 if (actionSetVlanId != null) {
949 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
950 actionSetVlanId.vlanId());
951 openFlowActions.add(ofa);
952 actionsLen += ofa.getLength();
953 }
954
955 if (actionSetVlanPriority != null) {
956 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
957 actionSetVlanPriority.vlanPriority());
958 openFlowActions.add(ofa);
959 actionsLen += ofa.getLength();
960 }
961
962 if (actionStripVlan != null) {
963 if (actionStripVlan.stripVlan() == true) {
964 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
965 openFlowActions.add(ofa);
966 actionsLen += ofa.getLength();
967 }
968 }
969
970 if (actionSetEthernetSrcAddr != null) {
971 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
972 actionSetEthernetSrcAddr.addr().toBytes());
973 openFlowActions.add(ofa);
974 actionsLen += ofa.getLength();
975 }
976
977 if (actionSetEthernetDstAddr != null) {
978 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
979 actionSetEthernetDstAddr.addr().toBytes());
980 openFlowActions.add(ofa);
981 actionsLen += ofa.getLength();
982 }
983
984 if (actionSetIPv4SrcAddr != null) {
985 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
986 actionSetIPv4SrcAddr.addr().value());
987 openFlowActions.add(ofa);
988 actionsLen += ofa.getLength();
989 }
990
991 if (actionSetIPv4DstAddr != null) {
992 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
993 actionSetIPv4DstAddr.addr().value());
994 openFlowActions.add(ofa);
995 actionsLen += ofa.getLength();
996 }
997
998 if (actionSetIpToS != null) {
999 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
1000 actionSetIpToS.ipToS());
1001 openFlowActions.add(ofa);
1002 actionsLen += ofa.getLength();
1003 }
1004
1005 if (actionSetTcpUdpSrcPort != null) {
1006 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
1007 actionSetTcpUdpSrcPort.port());
1008 openFlowActions.add(ofa);
1009 actionsLen += ofa.getLength();
1010 }
1011
1012 if (actionSetTcpUdpDstPort != null) {
1013 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
1014 actionSetTcpUdpDstPort.port());
1015 openFlowActions.add(ofa);
1016 actionsLen += ofa.getLength();
1017 }
1018
1019 if (actionEnqueue != null) {
1020 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
1021 .value(), actionEnqueue.queueId());
1022 openFlowActions.add(ofa);
1023 actionsLen += ofa.getLength();
1024 }
1025 }
1026
1027 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
1028 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
1029 .setPriority(PRIORITY_DEFAULT)
1030 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
1031 .setCommand(flowModCommand).setMatch(match)
1032 .setActions(openFlowActions)
1033 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
1034 fm.setOutPort(OFPort.OFPP_NONE.getValue());
1035 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
1036 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
1037 if (actionOutputPort != null)
1038 fm.setOutPort(actionOutputPort);
1039 }
1040
1041 //
1042 // TODO: Set the following flag
1043 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
1044 // See method ForwardingBase::pushRoute()
1045 //
1046
1047 //
1048 // Write the message to the switch
1049 //
1050 log.debug("MEASUREMENT: Installing flow entry "
1051 + flowEntry.flowEntryUserState() + " into switch DPID: "
1052 + sw.getStringId() + " flowEntryId: "
1053 + flowEntry.flowEntryId().toString() + " srcMac: "
1054 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
1055 + matchInPort + " outPort: " + actionOutputPort);
1056
1057 //
1058 // TODO: We should use the OpenFlow Barrier mechanism
1059 // to check for errors, and update the SwitchState
1060 // for a flow entry after the Barrier message is
1061 // is received.
1062 //
1063 // TODO: The FlowEntry Object in Titan should be set
1064 // to FE_SWITCH_UPDATED.
1065 //
1066
1067 return add(sw,fm);
1068 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001069
1070 @Override
1071 public OFBarrierReply barrier(IOFSwitch sw) {
1072 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
1073 if (future == null) {
1074 return null;
1075 }
1076
1077 try {
1078 return future.get();
1079 } catch (InterruptedException e) {
1080 e.printStackTrace();
1081 log.error("InterruptedException: {}", e);
1082 return null;
1083 } catch (ExecutionException e) {
1084 e.printStackTrace();
1085 log.error("ExecutionException: {}", e);
1086 return null;
1087 }
1088 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -08001089
Naoki Shiotac1601d32013-11-20 10:47:34 -08001090 @Override
1091 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1092 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -08001093
1094 if (sw == null) {
1095 return null;
1096 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001097
Naoki Shiota81dbe302013-11-21 15:35:38 -08001098 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001099 msg.setXid(sw.getNextTransactionId());
1100 add(sw, msg);
1101
1102 // TODO create Future object of message
1103 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiota81dbe302013-11-21 15:35:38 -08001104
Naoki Shiotac1601d32013-11-20 10:47:34 -08001105 synchronized (barrierFutures) {
1106 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1107 if (map == null) {
1108 map = new HashMap<Integer,OFBarrierReplyFuture>();
1109 barrierFutures.put(sw.getId(), map);
1110 }
1111 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001112 }
1113
1114 return future;
1115 }
1116
Naoki Shiotae3199732013-11-25 16:14:43 -08001117 /**
1118 * Get a queue attached to a switch.
1119 * @param sw Switch object
1120 * @return Queue object
1121 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001122 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001123 if (sw == null) {
1124 return null;
1125 }
1126
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001127 return getProcess(sw).queues.get(sw);
1128 }
1129
Naoki Shiotae3199732013-11-25 16:14:43 -08001130 /**
1131 * Get a hash value correspondent to a switch.
1132 * @param sw Switch object
1133 * @return Hash value
1134 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001135 protected long getHash(IOFSwitch sw) {
1136 // This code assumes DPID is sequentially assigned.
1137 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001138 return sw.getId() % number_thread;
1139 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001140
1141 /**
1142 * Get a Thread object which processes the queue attached to a switch.
1143 * @param sw Switch object
1144 * @return Thread object
1145 */
Naoki Shiota81dbe302013-11-21 15:35:38 -08001146 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001147 long hash = getHash(sw);
1148
1149 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001150 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001151
1152 @Override
1153 public String getName() {
1154 return "flowpusher";
1155 }
1156
1157 @Override
1158 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1159 return false;
1160 }
1161
1162 @Override
1163 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1164 return false;
1165 }
1166
1167 @Override
1168 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -08001169 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1170 if (map == null) {
1171 return Command.CONTINUE;
1172 }
1173
1174 OFBarrierReplyFuture future = map.get(msg.getXid());
1175 if (future == null) {
1176 return Command.CONTINUE;
1177 }
1178
1179 log.debug("Received BARRIER_REPLY : {}", msg);
1180 future.deliverFuture(sw, msg);
1181
1182 return Command.CONTINUE;
1183 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001184
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001185}