blob: 6379f2681024516def1b77864f4750d7c4620a1b [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08008import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080010import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080011import java.util.concurrent.ExecutionException;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012
Naoki Shiota7d0cf272013-11-05 10:18:12 -080013import org.openflow.protocol.*;
14import org.openflow.protocol.action.*;
15import org.openflow.protocol.factory.BasicFactory;
16import org.slf4j.Logger;
17import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070018
19import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080020import net.floodlightcontroller.core.IFloodlightProviderService;
21import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070022import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080023import net.floodlightcontroller.core.internal.OFMessageFuture;
24import net.floodlightcontroller.core.module.FloodlightModuleContext;
25import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080026import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080027import net.floodlightcontroller.util.OFMessageDamper;
28import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
29import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
30import net.onrc.onos.ofcontroller.util.FlowEntryAction;
31import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080032import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080033import net.onrc.onos.ofcontroller.util.FlowEntryActions;
34import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080035import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
36import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
37import net.onrc.onos.ofcontroller.util.FlowPath;
38import net.onrc.onos.ofcontroller.util.IPv4Net;
39import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070040
41/**
Naoki Shiota8739faa2013-11-18 17:00:25 -080042 * FlowPusher intermediates FlowManager/FlowSynchronizer and switches to push OpenFlow
43 * messages to switches in proper rate.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070044 * @author Naoki Shiota
45 *
46 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080047public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080048 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
49
50 // NOTE: Below are moved from FlowManager.
51 // TODO: Values copied from elsewhere (class LearningSwitch).
52 // The local copy should go away!
53 //
54 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
55 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080056
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080057 // Interval of sleep when queue is empty
58 protected static final long SLEEP_MILLI_SEC = 10;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080059 protected static final int SLEEP_NANO_SEC = 0;
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080060
61 // Number of messages sent to switch at once
62 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080063
64 public static final short PRIORITY_DEFAULT = 100;
65 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
66 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
67
68 public enum QueueState {
69 READY,
70 SUSPENDED,
71 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070072
Naoki Shiotac1601d32013-11-20 10:47:34 -080073 /**
74 * Message queue attached to a switch.
75 * This consists of queue itself and variables used for limiting sending rate.
76 * @author Naoki Shiota
77 *
78 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080079 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080080 private class SwitchQueue extends ArrayDeque<OFMessage> {
81 QueueState state;
82
83 // Max rate of sending message (bytes/sec). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080084 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070085 long last_sent_time = 0;
86 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080087
88 /**
89 * Check if sending rate is within the rate
90 * @param current Current time
91 * @return true if within the rate
92 */
93 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -080094 if (max_rate == 0) {
95 // no limitation
96 return true;
97 }
98
Naoki Shiotac1601d32013-11-20 10:47:34 -080099 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800100 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800101 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800102 }
103
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800104 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800105 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800106 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800107 }
108
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700109 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800110
Naoki Shiotac1601d32013-11-20 10:47:34 -0800111 private OFMessageDamper messageDamper = null;
112 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700113
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800114 private FloodlightContext context = null;
115 private BasicFactory factory = null;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800116 private Map<Long, FlowPusherProcess> threadMap = null;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800117 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
118 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800119
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800120 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800121
Naoki Shiota8739faa2013-11-18 17:00:25 -0800122 /**
123 * Main thread that reads messages from queues and sends them to switches.
124 * @author Naoki Shiota
125 *
126 */
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700127 private class FlowPusherProcess implements Runnable {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800128 private Map<IOFSwitch,SwitchQueue> queues
129 = new HashMap<IOFSwitch,SwitchQueue>();
130
131 private boolean isStopped = false;
132 private boolean isMsgAdded = false;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800133
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700134 @Override
135 public void run() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800136 log.debug("Begin Flow Pusher Process");
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700137
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700138 while (true) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800139 Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
140 synchronized (queues) {
141 entries = queues.entrySet();
142 }
143
144 // Set taint flag to false at this moment.
145 isMsgAdded = false;
146
147 for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800148 IOFSwitch sw = entry.getKey();
149 SwitchQueue queue = entry.getValue();
150
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700151 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800152 if (sw == null || queue == null ||
153 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700154 continue;
155 }
156
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800157 // check sending rate and determine it to be sent or not
158 long current_time = System.nanoTime();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800159 long size = 0;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800160
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800161 synchronized (queue) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800162 if (queue.isSendable(current_time)) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800163 int i = 0;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800164 while (! queue.isEmpty()) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800165 // Number of messages excess the limit
166 if (++i >= MAX_MESSAGE_SEND) {
167 // Messages remains in queue
168 isMsgAdded = true;
169 break;
170 }
171
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800172 OFMessage msg = queue.poll();
173
174 // if need to send, call IOFSwitch#write()
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800175 try {
176 messageDamper.write(sw, msg, context);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800177 log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800178 size += msg.getLength();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800179 } catch (IOException e) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800180 e.printStackTrace();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800181 log.error("Exception in sending message ({}) : {}", msg, e);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800182 }
183 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800184 sw.flush();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800185 queue.logSentData(current_time, size);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700186 }
187 }
188 }
189
190 // sleep while all queues are empty
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800191 while (! (isMsgAdded || isStopped)) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800192 try {
193 Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
194 } catch (InterruptedException e) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800195 e.printStackTrace();
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800196 log.error("Thread.sleep failed");
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800197 }
198 }
199
200 log.debug("Exit sleep loop.");
201
202 if (isStopped) {
203 log.debug("Pusher Process finished.");
204 return;
205 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800206
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700207 }
208 }
209 }
210
Naoki Shiotac1601d32013-11-20 10:47:34 -0800211 /**
212 * Initialize object with one thread.
213 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800214 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800215 }
216
Naoki Shiotac1601d32013-11-20 10:47:34 -0800217 /**
218 * Initialize object with threads of given number.
219 * @param number_thread Number of threads to handle messages.
220 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800221 public FlowPusher(int number_thread) {
222 this.number_thread = number_thread;
223 }
224
Naoki Shiotac1601d32013-11-20 10:47:34 -0800225 /**
226 * Set parameters needed for sending messages.
227 * @param context FloodlightContext used for sending messages.
228 * If null, FlowPusher uses default context.
229 * @param modContext FloodlightModuleContext used for acquiring
230 * ThreadPoolService and registering MessageListener.
231 * @param factory Factory object to create OFMessage objects.
232 * @param damper Message damper used for sending messages.
233 * If null, FlowPusher creates its own damper object.
234 */
235 public void init(FloodlightContext context,
236 FloodlightModuleContext modContext,
237 BasicFactory factory,
238 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700239 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800240 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800241 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
242 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
243 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800244
245 if (damper != null) {
246 messageDamper = damper;
247 } else {
248 // use default value
249 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
250 EnumSet.of(OFType.FLOW_MOD),
251 OFMESSAGE_DAMPER_TIMEOUT);
252 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700253 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800254
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800255 /**
256 * Begin processing queue.
257 */
258 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800259 if (factory == null) {
260 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800261 return;
262 }
263
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800264 threadMap = new HashMap<Long,FlowPusherProcess>();
265 for (long i = 0; i < number_thread; ++i) {
266 FlowPusherProcess runnable = new FlowPusherProcess();
267 threadMap.put(i, runnable);
268
269 Thread thread = new Thread(runnable);
270 thread.start();
271 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700272 }
273
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800274 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800275 * Suspend sending messages to switch.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800276 * @param sw
277 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800278 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800279 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800280 SwitchQueue queue = getQueue(sw);
281
282 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800283 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800284 }
285
286 synchronized (queue) {
287 if (queue.state == QueueState.READY) {
288 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800289 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800290 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800291 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800292 }
293 }
294
295 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800296 * Resume sending messages to switch.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800297 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800298 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800299 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800300 SwitchQueue queue = getQueue(sw);
301
302 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800303 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800304 }
305
306 synchronized (queue) {
307 if (queue.state == QueueState.SUSPENDED) {
308 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800309 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800310 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800311 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800312 }
313 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800314
Naoki Shiota8739faa2013-11-18 17:00:25 -0800315 /**
316 * Check if given switch is suspended.
317 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800318 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800319 public boolean isSuspended(IOFSwitch sw) {
320 SwitchQueue queue = getQueue(sw);
321
322 if (queue == null) {
323 // TODO Is true suitable for this case?
324 return true;
325 }
326
327 return (queue.state == QueueState.SUSPENDED);
328 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800329
330 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800331 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800332 */
333 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800334 if (threadMap == null) {
335 return;
336 }
337
338 for (FlowPusherProcess runnable : threadMap.values()) {
339 if (! runnable.isStopped) {
340 runnable.isStopped = true;
341 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800342 }
343 }
344
Naoki Shiota8739faa2013-11-18 17:00:25 -0800345 /**
346 * Set sending rate to a switch.
347 * @param sw Switch.
348 * @param rate Rate in bytes/sec.
349 */
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800350 public void setRate(IOFSwitch sw, long rate) {
351 SwitchQueue queue = getQueue(sw);
352 if (queue == null) {
353 return;
354 }
355
356 if (rate > 0) {
357 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700358 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700359 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700360
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800361 /**
Naoki Shiotac1601d32013-11-20 10:47:34 -0800362 * Add OFMessage to queue of the switch.
Naoki Shiota8739faa2013-11-18 17:00:25 -0800363 * @param sw Switch to which message is sent.
364 * @param msg Message to be sent.
365 * @return true if succeed.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800366 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800367 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800368 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800369 FlowPusherProcess proc = getProcess(sw);
370 SwitchQueue queue = proc.queues.get(sw);
371
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800372 if (queue == null) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800373 queue = new SwitchQueue();
374 queue.state = QueueState.READY;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800375 synchronized (proc) {
376 proc.queues.put(sw, queue);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800377 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800378 }
379
380 synchronized (queue) {
381 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800382 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800383 }
384
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800385 proc.isMsgAdded = true;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800386
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800387 return true;
388 }
389
390 /**
391 * Create OFMessage from given flow information and add it to the queue.
Naoki Shiota8739faa2013-11-18 17:00:25 -0800392 * @param sw Switch to which message is sent.
393 * @param flowObj FlowPath.
394 * @param flowEntryObj FlowEntry.
395 * @return true if succeed.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800396 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800397 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800398 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800399 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800400 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
401 if (flowEntryIdStr == null)
402 return false;
403 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
404 String userState = flowEntryObj.getUserState();
405 if (userState == null)
406 return false;
407
408 //
409 // Create the Open Flow Flow Modification Entry to push
410 //
411 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
412 long cookie = flowEntryId.value();
413
414 short flowModCommand = OFFlowMod.OFPFC_ADD;
415 if (userState.equals("FE_USER_ADD")) {
416 flowModCommand = OFFlowMod.OFPFC_ADD;
417 } else if (userState.equals("FE_USER_MODIFY")) {
418 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
419 } else if (userState.equals("FE_USER_DELETE")) {
420 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
421 } else {
422 // Unknown user state. Ignore the entry
423 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
424 flowEntryId.toString(), userState);
425 return false;
426 }
427
428 //
429 // Fetch the match conditions.
430 //
431 // NOTE: The Flow matching conditions common for all Flow Entries are
432 // used ONLY if a Flow Entry does NOT have the corresponding matching
433 // condition set.
434 //
435 OFMatch match = new OFMatch();
436 match.setWildcards(OFMatch.OFPFW_ALL);
437
438 // Match the Incoming Port
439 Short matchInPort = flowEntryObj.getMatchInPort();
440 if (matchInPort != null) {
441 match.setInputPort(matchInPort);
442 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
443 }
444
445 // Match the Source MAC address
446 String matchSrcMac = flowEntryObj.getMatchSrcMac();
447 if (matchSrcMac == null)
448 matchSrcMac = flowObj.getMatchSrcMac();
449 if (matchSrcMac != null) {
450 match.setDataLayerSource(matchSrcMac);
451 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
452 }
453
454 // Match the Destination MAC address
455 String matchDstMac = flowEntryObj.getMatchDstMac();
456 if (matchDstMac == null)
457 matchDstMac = flowObj.getMatchDstMac();
458 if (matchDstMac != null) {
459 match.setDataLayerDestination(matchDstMac);
460 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
461 }
462
463 // Match the Ethernet Frame Type
464 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
465 if (matchEthernetFrameType == null)
466 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
467 if (matchEthernetFrameType != null) {
468 match.setDataLayerType(matchEthernetFrameType);
469 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
470 }
471
472 // Match the VLAN ID
473 Short matchVlanId = flowEntryObj.getMatchVlanId();
474 if (matchVlanId == null)
475 matchVlanId = flowObj.getMatchVlanId();
476 if (matchVlanId != null) {
477 match.setDataLayerVirtualLan(matchVlanId);
478 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
479 }
480
481 // Match the VLAN priority
482 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
483 if (matchVlanPriority == null)
484 matchVlanPriority = flowObj.getMatchVlanPriority();
485 if (matchVlanPriority != null) {
486 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
487 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
488 }
489
490 // Match the Source IPv4 Network prefix
491 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
492 if (matchSrcIPv4Net == null)
493 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
494 if (matchSrcIPv4Net != null) {
495 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
496 }
497
498 // Match the Destination IPv4 Network prefix
499 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
500 if (matchDstIPv4Net == null)
501 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
502 if (matchDstIPv4Net != null) {
503 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
504 }
505
506 // Match the IP protocol
507 Byte matchIpProto = flowEntryObj.getMatchIpProto();
508 if (matchIpProto == null)
509 matchIpProto = flowObj.getMatchIpProto();
510 if (matchIpProto != null) {
511 match.setNetworkProtocol(matchIpProto);
512 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
513 }
514
515 // Match the IP ToS (DSCP field, 6 bits)
516 Byte matchIpToS = flowEntryObj.getMatchIpToS();
517 if (matchIpToS == null)
518 matchIpToS = flowObj.getMatchIpToS();
519 if (matchIpToS != null) {
520 match.setNetworkTypeOfService(matchIpToS);
521 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
522 }
523
524 // Match the Source TCP/UDP port
525 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
526 if (matchSrcTcpUdpPort == null)
527 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
528 if (matchSrcTcpUdpPort != null) {
529 match.setTransportSource(matchSrcTcpUdpPort);
530 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
531 }
532
533 // Match the Destination TCP/UDP port
534 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
535 if (matchDstTcpUdpPort == null)
536 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
537 if (matchDstTcpUdpPort != null) {
538 match.setTransportDestination(matchDstTcpUdpPort);
539 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
540 }
541
542 //
543 // Fetch the actions
544 //
545 Short actionOutputPort = null;
546 List<OFAction> openFlowActions = new ArrayList<OFAction>();
547 int actionsLen = 0;
548 FlowEntryActions flowEntryActions = null;
549 String actionsStr = flowEntryObj.getActions();
550 if (actionsStr != null)
551 flowEntryActions = new FlowEntryActions(actionsStr);
552 else
553 flowEntryActions = new FlowEntryActions();
554 for (FlowEntryAction action : flowEntryActions.actions()) {
555 ActionOutput actionOutput = action.actionOutput();
556 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
557 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
558 ActionStripVlan actionStripVlan = action.actionStripVlan();
559 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
560 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
561 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
562 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
563 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
564 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
565 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
566 ActionEnqueue actionEnqueue = action.actionEnqueue();
567
568 if (actionOutput != null) {
569 actionOutputPort = actionOutput.port().value();
570 // XXX: The max length is hard-coded for now
571 OFActionOutput ofa =
572 new OFActionOutput(actionOutput.port().value(),
573 (short)0xffff);
574 openFlowActions.add(ofa);
575 actionsLen += ofa.getLength();
576 }
577
578 if (actionSetVlanId != null) {
579 OFActionVirtualLanIdentifier ofa =
580 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
581 openFlowActions.add(ofa);
582 actionsLen += ofa.getLength();
583 }
584
585 if (actionSetVlanPriority != null) {
586 OFActionVirtualLanPriorityCodePoint ofa =
587 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
588 openFlowActions.add(ofa);
589 actionsLen += ofa.getLength();
590 }
591
592 if (actionStripVlan != null) {
593 if (actionStripVlan.stripVlan() == true) {
594 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
595 openFlowActions.add(ofa);
596 actionsLen += ofa.getLength();
597 }
598 }
599
600 if (actionSetEthernetSrcAddr != null) {
601 OFActionDataLayerSource ofa =
602 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
603 openFlowActions.add(ofa);
604 actionsLen += ofa.getLength();
605 }
606
607 if (actionSetEthernetDstAddr != null) {
608 OFActionDataLayerDestination ofa =
609 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
610 openFlowActions.add(ofa);
611 actionsLen += ofa.getLength();
612 }
613
614 if (actionSetIPv4SrcAddr != null) {
615 OFActionNetworkLayerSource ofa =
616 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
617 openFlowActions.add(ofa);
618 actionsLen += ofa.getLength();
619 }
620
621 if (actionSetIPv4DstAddr != null) {
622 OFActionNetworkLayerDestination ofa =
623 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
624 openFlowActions.add(ofa);
625 actionsLen += ofa.getLength();
626 }
627
628 if (actionSetIpToS != null) {
629 OFActionNetworkTypeOfService ofa =
630 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
631 openFlowActions.add(ofa);
632 actionsLen += ofa.getLength();
633 }
634
635 if (actionSetTcpUdpSrcPort != null) {
636 OFActionTransportLayerSource ofa =
637 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
638 openFlowActions.add(ofa);
639 actionsLen += ofa.getLength();
640 }
641
642 if (actionSetTcpUdpDstPort != null) {
643 OFActionTransportLayerDestination ofa =
644 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
645 openFlowActions.add(ofa);
646 actionsLen += ofa.getLength();
647 }
648
649 if (actionEnqueue != null) {
650 OFActionEnqueue ofa =
651 new OFActionEnqueue(actionEnqueue.port().value(),
652 actionEnqueue.queueId());
653 openFlowActions.add(ofa);
654 actionsLen += ofa.getLength();
655 }
656 }
657
658 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
659 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
660 .setPriority(PRIORITY_DEFAULT)
661 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
662 .setCookie(cookie)
663 .setCommand(flowModCommand)
664 .setMatch(match)
665 .setActions(openFlowActions)
666 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
667 fm.setOutPort(OFPort.OFPP_NONE.getValue());
668 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
669 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
670 if (actionOutputPort != null)
671 fm.setOutPort(actionOutputPort);
672 }
673
674 //
675 // TODO: Set the following flag
676 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
677 // See method ForwardingBase::pushRoute()
678 //
679
680 //
681 // Write the message to the switch
682 //
683 log.debug("MEASUREMENT: Installing flow entry " + userState +
684 " into switch DPID: " +
685 sw.getStringId() +
686 " flowEntryId: " + flowEntryId.toString() +
687 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
688 " inPort: " + matchInPort + " outPort: " + actionOutputPort
689 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800690 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800691 //
692 // TODO: We should use the OpenFlow Barrier mechanism
693 // to check for errors, and update the SwitchState
694 // for a flow entry after the Barrier message is
695 // is received.
696 //
697 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
698
699 return true;
700 }
701
Naoki Shiota8739faa2013-11-18 17:00:25 -0800702 /**
703 * Create OFMessage from given flow information and add it to the queue.
704 * @param sw Switch to which message is sent.
705 * @param flowPath FlowPath.
706 * @param flowEntry FlowEntry.
707 * @return true if secceed.
708 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800709 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800710 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
711 //
712 // Create the OpenFlow Flow Modification Entry to push
713 //
714 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
715 long cookie = flowEntry.flowEntryId().value();
716
717 short flowModCommand = OFFlowMod.OFPFC_ADD;
718 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
719 flowModCommand = OFFlowMod.OFPFC_ADD;
720 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
721 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
722 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
723 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
724 } else {
725 // Unknown user state. Ignore the entry
726 log.debug(
727 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
728 flowEntry.flowEntryId().toString(),
729 flowEntry.flowEntryUserState());
730 return false;
731 }
732
733 //
734 // Fetch the match conditions.
735 //
736 // NOTE: The Flow matching conditions common for all Flow Entries are
737 // used ONLY if a Flow Entry does NOT have the corresponding matching
738 // condition set.
739 //
740 OFMatch match = new OFMatch();
741 match.setWildcards(OFMatch.OFPFW_ALL);
742 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
743 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
744
745 // Match the Incoming Port
746 Port matchInPort = flowEntryMatch.inPort();
747 if (matchInPort != null) {
748 match.setInputPort(matchInPort.value());
749 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
750 }
751
752 // Match the Source MAC address
753 MACAddress matchSrcMac = flowEntryMatch.srcMac();
754 if ((matchSrcMac == null) && (flowPathMatch != null)) {
755 matchSrcMac = flowPathMatch.srcMac();
756 }
757 if (matchSrcMac != null) {
758 match.setDataLayerSource(matchSrcMac.toString());
759 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
760 }
761
762 // Match the Destination MAC address
763 MACAddress matchDstMac = flowEntryMatch.dstMac();
764 if ((matchDstMac == null) && (flowPathMatch != null)) {
765 matchDstMac = flowPathMatch.dstMac();
766 }
767 if (matchDstMac != null) {
768 match.setDataLayerDestination(matchDstMac.toString());
769 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
770 }
771
772 // Match the Ethernet Frame Type
773 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
774 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
775 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
776 }
777 if (matchEthernetFrameType != null) {
778 match.setDataLayerType(matchEthernetFrameType);
779 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
780 }
781
782 // Match the VLAN ID
783 Short matchVlanId = flowEntryMatch.vlanId();
784 if ((matchVlanId == null) && (flowPathMatch != null)) {
785 matchVlanId = flowPathMatch.vlanId();
786 }
787 if (matchVlanId != null) {
788 match.setDataLayerVirtualLan(matchVlanId);
789 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
790 }
791
792 // Match the VLAN priority
793 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
794 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
795 matchVlanPriority = flowPathMatch.vlanPriority();
796 }
797 if (matchVlanPriority != null) {
798 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
799 match.setWildcards(match.getWildcards()
800 & ~OFMatch.OFPFW_DL_VLAN_PCP);
801 }
802
803 // Match the Source IPv4 Network prefix
804 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
805 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
806 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
807 }
808 if (matchSrcIPv4Net != null) {
809 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
810 }
811
812 // Natch the Destination IPv4 Network prefix
813 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
814 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
815 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
816 }
817 if (matchDstIPv4Net != null) {
818 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
819 }
820
821 // Match the IP protocol
822 Byte matchIpProto = flowEntryMatch.ipProto();
823 if ((matchIpProto == null) && (flowPathMatch != null)) {
824 matchIpProto = flowPathMatch.ipProto();
825 }
826 if (matchIpProto != null) {
827 match.setNetworkProtocol(matchIpProto);
828 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
829 }
830
831 // Match the IP ToS (DSCP field, 6 bits)
832 Byte matchIpToS = flowEntryMatch.ipToS();
833 if ((matchIpToS == null) && (flowPathMatch != null)) {
834 matchIpToS = flowPathMatch.ipToS();
835 }
836 if (matchIpToS != null) {
837 match.setNetworkTypeOfService(matchIpToS);
838 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
839 }
840
841 // Match the Source TCP/UDP port
842 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
843 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
844 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
845 }
846 if (matchSrcTcpUdpPort != null) {
847 match.setTransportSource(matchSrcTcpUdpPort);
848 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
849 }
850
851 // Match the Destination TCP/UDP port
852 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
853 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
854 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
855 }
856 if (matchDstTcpUdpPort != null) {
857 match.setTransportDestination(matchDstTcpUdpPort);
858 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
859 }
860
861 //
862 // Fetch the actions
863 //
864 Short actionOutputPort = null;
865 List<OFAction> openFlowActions = new ArrayList<OFAction>();
866 int actionsLen = 0;
867 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
868 //
869 for (FlowEntryAction action : flowEntryActions.actions()) {
870 ActionOutput actionOutput = action.actionOutput();
871 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
872 ActionSetVlanPriority actionSetVlanPriority = action
873 .actionSetVlanPriority();
874 ActionStripVlan actionStripVlan = action.actionStripVlan();
875 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
876 .actionSetEthernetSrcAddr();
877 ActionSetEthernetAddr actionSetEthernetDstAddr = action
878 .actionSetEthernetDstAddr();
879 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
880 .actionSetIPv4SrcAddr();
881 ActionSetIPv4Addr actionSetIPv4DstAddr = action
882 .actionSetIPv4DstAddr();
883 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
884 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
885 .actionSetTcpUdpSrcPort();
886 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
887 .actionSetTcpUdpDstPort();
888 ActionEnqueue actionEnqueue = action.actionEnqueue();
889
890 if (actionOutput != null) {
891 actionOutputPort = actionOutput.port().value();
892 // XXX: The max length is hard-coded for now
893 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
894 .value(), (short) 0xffff);
895 openFlowActions.add(ofa);
896 actionsLen += ofa.getLength();
897 }
898
899 if (actionSetVlanId != null) {
900 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
901 actionSetVlanId.vlanId());
902 openFlowActions.add(ofa);
903 actionsLen += ofa.getLength();
904 }
905
906 if (actionSetVlanPriority != null) {
907 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
908 actionSetVlanPriority.vlanPriority());
909 openFlowActions.add(ofa);
910 actionsLen += ofa.getLength();
911 }
912
913 if (actionStripVlan != null) {
914 if (actionStripVlan.stripVlan() == true) {
915 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
916 openFlowActions.add(ofa);
917 actionsLen += ofa.getLength();
918 }
919 }
920
921 if (actionSetEthernetSrcAddr != null) {
922 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
923 actionSetEthernetSrcAddr.addr().toBytes());
924 openFlowActions.add(ofa);
925 actionsLen += ofa.getLength();
926 }
927
928 if (actionSetEthernetDstAddr != null) {
929 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
930 actionSetEthernetDstAddr.addr().toBytes());
931 openFlowActions.add(ofa);
932 actionsLen += ofa.getLength();
933 }
934
935 if (actionSetIPv4SrcAddr != null) {
936 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
937 actionSetIPv4SrcAddr.addr().value());
938 openFlowActions.add(ofa);
939 actionsLen += ofa.getLength();
940 }
941
942 if (actionSetIPv4DstAddr != null) {
943 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
944 actionSetIPv4DstAddr.addr().value());
945 openFlowActions.add(ofa);
946 actionsLen += ofa.getLength();
947 }
948
949 if (actionSetIpToS != null) {
950 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
951 actionSetIpToS.ipToS());
952 openFlowActions.add(ofa);
953 actionsLen += ofa.getLength();
954 }
955
956 if (actionSetTcpUdpSrcPort != null) {
957 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
958 actionSetTcpUdpSrcPort.port());
959 openFlowActions.add(ofa);
960 actionsLen += ofa.getLength();
961 }
962
963 if (actionSetTcpUdpDstPort != null) {
964 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
965 actionSetTcpUdpDstPort.port());
966 openFlowActions.add(ofa);
967 actionsLen += ofa.getLength();
968 }
969
970 if (actionEnqueue != null) {
971 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
972 .value(), actionEnqueue.queueId());
973 openFlowActions.add(ofa);
974 actionsLen += ofa.getLength();
975 }
976 }
977
978 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
979 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
980 .setPriority(PRIORITY_DEFAULT)
981 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
982 .setCommand(flowModCommand).setMatch(match)
983 .setActions(openFlowActions)
984 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
985 fm.setOutPort(OFPort.OFPP_NONE.getValue());
986 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
987 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
988 if (actionOutputPort != null)
989 fm.setOutPort(actionOutputPort);
990 }
991
992 //
993 // TODO: Set the following flag
994 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
995 // See method ForwardingBase::pushRoute()
996 //
997
998 //
999 // Write the message to the switch
1000 //
1001 log.debug("MEASUREMENT: Installing flow entry "
1002 + flowEntry.flowEntryUserState() + " into switch DPID: "
1003 + sw.getStringId() + " flowEntryId: "
1004 + flowEntry.flowEntryId().toString() + " srcMac: "
1005 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
1006 + matchInPort + " outPort: " + actionOutputPort);
1007
1008 //
1009 // TODO: We should use the OpenFlow Barrier mechanism
1010 // to check for errors, and update the SwitchState
1011 // for a flow entry after the Barrier message is
1012 // is received.
1013 //
1014 // TODO: The FlowEntry Object in Titan should be set
1015 // to FE_SWITCH_UPDATED.
1016 //
1017
1018 return add(sw,fm);
1019 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001020
1021 @Override
1022 public OFBarrierReply barrier(IOFSwitch sw) {
1023 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
1024 if (future == null) {
1025 return null;
1026 }
1027
1028 try {
1029 return future.get();
1030 } catch (InterruptedException e) {
1031 e.printStackTrace();
1032 log.error("InterruptedException: {}", e);
1033 return null;
1034 } catch (ExecutionException e) {
1035 e.printStackTrace();
1036 log.error("ExecutionException: {}", e);
1037 return null;
1038 }
1039 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -08001040
Naoki Shiotac1601d32013-11-20 10:47:34 -08001041 @Override
1042 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1043 // TODO creation of message and future should be moved to OFSwitchImpl
1044
1045 OFMessage msg = factory.getMessage(OFType.BARRIER_REQUEST);
1046 msg.setXid(sw.getNextTransactionId());
1047 add(sw, msg);
1048
1049 // TODO create Future object of message
1050 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
1051
1052 synchronized (barrierFutures) {
1053 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1054 if (map == null) {
1055 map = new HashMap<Integer,OFBarrierReplyFuture>();
1056 barrierFutures.put(sw.getId(), map);
1057 }
1058 map.put(msg.getXid(), future);
1059 log.debug("Created future for {}", msg.getXid());
1060 }
1061
1062 return future;
1063 }
1064
1065 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001066 if (sw == null) {
1067 return null;
1068 }
1069
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001070 return getProcess(sw).queues.get(sw);
1071 }
1072
Naoki Shiotac1601d32013-11-20 10:47:34 -08001073 protected long getHash(IOFSwitch sw) {
1074 // This code assumes DPID is sequentially assigned.
1075 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001076 return sw.getId() % number_thread;
1077 }
1078
Naoki Shiotac1601d32013-11-20 10:47:34 -08001079 protected FlowPusherProcess getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001080 long hash = getHash(sw);
1081
1082 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001083 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001084
1085 @Override
1086 public String getName() {
1087 return "flowpusher";
1088 }
1089
1090 @Override
1091 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1092 return false;
1093 }
1094
1095 @Override
1096 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1097 return false;
1098 }
1099
1100 @Override
1101 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
1102 // This check can be skipped (since Controller must filter).
1103// if (msg.getType() != OFType.BARRIER_REPLY) {
1104// return Command.CONTINUE;
1105// }
1106
1107 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1108 if (map == null) {
1109 return Command.CONTINUE;
1110 }
1111
1112 OFBarrierReplyFuture future = map.get(msg.getXid());
1113 if (future == null) {
1114 return Command.CONTINUE;
1115 }
1116
1117 log.debug("Received BARRIER_REPLY : {}", msg);
1118 future.deliverFuture(sw, msg);
1119
1120 return Command.CONTINUE;
1121 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001122}