blob: 6f85c5d1a3aea6bd81c3e08b68d0eee63b2502dc [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08008import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080010import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080011import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080012import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070013
Naoki Shiota7d0cf272013-11-05 10:18:12 -080014import org.openflow.protocol.*;
15import org.openflow.protocol.action.*;
16import org.openflow.protocol.factory.BasicFactory;
17import org.slf4j.Logger;
18import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
20import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080027import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080028import net.floodlightcontroller.util.OFMessageDamper;
29import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
30import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
31import net.onrc.onos.ofcontroller.util.FlowEntryAction;
32import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080033import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080034import net.onrc.onos.ofcontroller.util.FlowEntryActions;
35import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080036import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
37import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
38import net.onrc.onos.ofcontroller.util.FlowPath;
39import net.onrc.onos.ofcontroller.util.IPv4Net;
40import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070041
42/**
Naoki Shiotab485d412013-11-26 12:04:19 -080043 * FlowPusher is a implementation of FlowPusherService.
44 * FlowPusher assigns one message queue instance for each one switch.
45 * Number of message processing threads is configurable by constructor, and
46 * one thread can handle multiple message queues. Each queue will be assigned to
47 * a thread according to hash function defined by getHash().
48 * Each processing thread reads messages from queues and sends it to switches
49 * in round-robin. Processing thread also calculates rate of sending to suppress
50 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070051 * @author Naoki Shiota
52 *
53 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080054public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080055 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
56
57 // NOTE: Below are moved from FlowManager.
58 // TODO: Values copied from elsewhere (class LearningSwitch).
59 // The local copy should go away!
60 //
61 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
62 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080063
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080064 // Number of messages sent to switch at once
65 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080066
67 public static final short PRIORITY_DEFAULT = 100;
68 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
69 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
70
71 public enum QueueState {
72 READY,
73 SUSPENDED,
74 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070075
Naoki Shiotac1601d32013-11-20 10:47:34 -080076 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080077 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080078 * This consists of queue itself and variables used for limiting sending rate.
79 * @author Naoki Shiota
80 *
81 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080082 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080083 private class SwitchQueue extends ArrayDeque<OFMessage> {
84 QueueState state;
85
Naoki Shiotae3199732013-11-25 16:14:43 -080086 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080087 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070088 long last_sent_time = 0;
89 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080090
Naoki Shiotae3199732013-11-25 16:14:43 -080091 // "To be deleted" flag
92 boolean toBeDeleted = false;
93
Naoki Shiota7d0cf272013-11-05 10:18:12 -080094 /**
95 * Check if sending rate is within the rate
96 * @param current Current time
97 * @return true if within the rate
98 */
99 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800100 if (max_rate == 0) {
101 // no limitation
102 return true;
103 }
104
Naoki Shiota81dbe302013-11-21 15:35:38 -0800105 if (current == last_sent_time) {
106 return false;
107 }
108
Naoki Shiotac1601d32013-11-20 10:47:34 -0800109 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800110 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800111 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800112 }
113
Naoki Shiota81dbe302013-11-21 15:35:38 -0800114 /**
115 * Log time and size of last sent data.
116 * @param current Time to be sent.
117 * @param size Size of sent data (in bytes).
118 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800119 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800120 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800121 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800122 }
123
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700124 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800125
Naoki Shiotac1601d32013-11-20 10:47:34 -0800126 private OFMessageDamper messageDamper = null;
127 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700128
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800129 private FloodlightContext context = null;
130 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800131
132 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800133 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800134 // Map of Future objects versus dpid and transaction ID.
Naoki Shiotac1601d32013-11-20 10:47:34 -0800135 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
136 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800137
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800138 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800139
Naoki Shiota8739faa2013-11-18 17:00:25 -0800140 /**
141 * Main thread that reads messages from queues and sends them to switches.
142 * @author Naoki Shiota
143 *
144 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800145 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800146 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotab485d412013-11-26 12:04:19 -0800147 = new HashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800148
Naoki Shiota81dbe302013-11-21 15:35:38 -0800149 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800150
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700151 @Override
152 public void run() {
153 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800154 try {
155 // wait for message pushed to queue
156 mutex.acquire();
157 } catch (InterruptedException e) {
158 e.printStackTrace();
159 log.debug("FlowPusherThread is interrupted");
160 return;
161 }
162
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800163 Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
164 synchronized (queues) {
165 entries = queues.entrySet();
166 }
167
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800168 for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800169 IOFSwitch sw = entry.getKey();
170 SwitchQueue queue = entry.getValue();
171
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700172 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800173 if (sw == null || queue == null ||
174 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700175 continue;
176 }
177
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800178 // check sending rate and determine it to be sent or not
Naoki Shiotab485d412013-11-26 12:04:19 -0800179 long current_time = System.currentTimeMillis();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800180 long size = 0;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800181
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800182 synchronized (queue) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800183 if (queue.isSendable(current_time)) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800184 int i = 0;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800185 while (! queue.isEmpty()) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800186 // Number of messages excess the limit
Naoki Shiota81dbe302013-11-21 15:35:38 -0800187 if (i >= MAX_MESSAGE_SEND) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800188 // Messages remains in queue
Naoki Shiota81dbe302013-11-21 15:35:38 -0800189 mutex.release();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800190 break;
191 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800192 ++i;
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800193
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800194 OFMessage msg = queue.poll();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800195 try {
196 messageDamper.write(sw, msg, context);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800197 log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800198 size += msg.getLength();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800199 } catch (IOException e) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800200 e.printStackTrace();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800201 log.error("Exception in sending message ({}) : {}", msg, e);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800202 }
203 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800204 sw.flush();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800205 queue.logSentData(current_time, size);
Naoki Shiota81dbe302013-11-21 15:35:38 -0800206
Naoki Shiotae3199732013-11-25 16:14:43 -0800207 if (queue.isEmpty()) {
Naoki Shiotab485d412013-11-26 12:04:19 -0800208 // remove queue if flagged to be.
Naoki Shiotae3199732013-11-25 16:14:43 -0800209 if (queue.toBeDeleted) {
210 synchronized (queues) {
211 queues.remove(sw);
212 }
213 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800214 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700215 }
216 }
217 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700218 }
219 }
220 }
221
Naoki Shiotac1601d32013-11-20 10:47:34 -0800222 /**
223 * Initialize object with one thread.
224 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800225 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800226 }
227
Naoki Shiotac1601d32013-11-20 10:47:34 -0800228 /**
229 * Initialize object with threads of given number.
230 * @param number_thread Number of threads to handle messages.
231 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800232 public FlowPusher(int number_thread) {
233 this.number_thread = number_thread;
234 }
235
Naoki Shiotac1601d32013-11-20 10:47:34 -0800236 /**
237 * Set parameters needed for sending messages.
238 * @param context FloodlightContext used for sending messages.
239 * If null, FlowPusher uses default context.
240 * @param modContext FloodlightModuleContext used for acquiring
241 * ThreadPoolService and registering MessageListener.
242 * @param factory Factory object to create OFMessage objects.
243 * @param damper Message damper used for sending messages.
244 * If null, FlowPusher creates its own damper object.
245 */
246 public void init(FloodlightContext context,
247 FloodlightModuleContext modContext,
248 BasicFactory factory,
249 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700250 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800251 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800252 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
253 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
254 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800255
256 if (damper != null) {
257 messageDamper = damper;
258 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800259 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800260 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
261 EnumSet.of(OFType.FLOW_MOD),
262 OFMESSAGE_DAMPER_TIMEOUT);
263 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700264 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800265
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800266 /**
267 * Begin processing queue.
268 */
269 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800270 if (factory == null) {
271 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800272 return;
273 }
274
Naoki Shiota81dbe302013-11-21 15:35:38 -0800275 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800276 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800277 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800278
Naoki Shiota81dbe302013-11-21 15:35:38 -0800279 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800280 thread.start();
281 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700282 }
283
Brian O'Connor8c166a72013-11-14 18:41:48 -0800284 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800285 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800286 SwitchQueue queue = getQueue(sw);
287
288 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800289 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800290 }
291
292 synchronized (queue) {
293 if (queue.state == QueueState.READY) {
294 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800295 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800296 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800297 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800298 }
299 }
300
Brian O'Connor8c166a72013-11-14 18:41:48 -0800301 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800302 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800303 SwitchQueue queue = getQueue(sw);
304
305 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800306 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800307 }
308
309 synchronized (queue) {
310 if (queue.state == QueueState.SUSPENDED) {
311 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800312 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800313 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800314 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800315 }
316 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800317
Brian O'Connor8c166a72013-11-14 18:41:48 -0800318 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800319 public boolean isSuspended(IOFSwitch sw) {
320 SwitchQueue queue = getQueue(sw);
321
322 if (queue == null) {
323 // TODO Is true suitable for this case?
324 return true;
325 }
326
327 return (queue.state == QueueState.SUSPENDED);
328 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800329
330 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800331 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800332 */
333 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800334 if (threadMap == null) {
335 return;
336 }
337
Naoki Shiota81dbe302013-11-21 15:35:38 -0800338 for (FlowPusherThread t : threadMap.values()) {
339 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800340 }
341 }
342
Naoki Shiotae3199732013-11-25 16:14:43 -0800343 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800344 public void setRate(IOFSwitch sw, long rate) {
345 SwitchQueue queue = getQueue(sw);
346 if (queue == null) {
347 return;
348 }
349
350 if (rate > 0) {
351 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700352 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700353 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800354
355 @Override
356 public boolean createQueue(IOFSwitch sw) {
357 SwitchQueue queue = getQueue(sw);
358 if (queue != null) {
359 return false;
360 }
361
362 FlowPusherThread proc = getProcess(sw);
363 queue = new SwitchQueue();
364 queue.state = QueueState.READY;
365 synchronized (proc) {
366 proc.queues.put(sw, queue);
367 }
368
369 return true;
370 }
371
372 @Override
373 public boolean deleteQueue(IOFSwitch sw) {
374 return deleteQueue(sw, false);
375 }
376
377 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800378 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800379 FlowPusherThread proc = getProcess(sw);
380
Naoki Shiotab485d412013-11-26 12:04:19 -0800381 if (forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800382 synchronized (proc.queues) {
383 SwitchQueue queue = proc.queues.remove(sw);
384 if (queue == null) {
385 return false;
386 }
387 }
388 return true;
389 } else {
390 SwitchQueue queue = getQueue(sw);
391 if (queue == null) {
392 return false;
393 }
394 synchronized (queue) {
395 queue.toBeDeleted = true;
396 }
397 return true;
398 }
399 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700400
Brian O'Connor8c166a72013-11-14 18:41:48 -0800401 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800402 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800403 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800404 SwitchQueue queue = proc.queues.get(sw);
405
Naoki Shiotab485d412013-11-26 12:04:19 -0800406 // create queue at first addition of message
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800407 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800408 createQueue(sw);
409 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800410 }
411
412 synchronized (queue) {
413 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800414 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800415 }
416
Naoki Shiota81dbe302013-11-21 15:35:38 -0800417 if (proc.mutex.availablePermits() == 0) {
418 proc.mutex.release();
419 }
420
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800421 return true;
422 }
423
Brian O'Connor8c166a72013-11-14 18:41:48 -0800424 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800425 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800426 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800427 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
428 if (flowEntryIdStr == null)
429 return false;
430 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
431 String userState = flowEntryObj.getUserState();
432 if (userState == null)
433 return false;
434
435 //
436 // Create the Open Flow Flow Modification Entry to push
437 //
438 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
439 long cookie = flowEntryId.value();
440
441 short flowModCommand = OFFlowMod.OFPFC_ADD;
442 if (userState.equals("FE_USER_ADD")) {
443 flowModCommand = OFFlowMod.OFPFC_ADD;
444 } else if (userState.equals("FE_USER_MODIFY")) {
445 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
446 } else if (userState.equals("FE_USER_DELETE")) {
447 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
448 } else {
449 // Unknown user state. Ignore the entry
450 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
451 flowEntryId.toString(), userState);
452 return false;
453 }
454
455 //
456 // Fetch the match conditions.
457 //
458 // NOTE: The Flow matching conditions common for all Flow Entries are
459 // used ONLY if a Flow Entry does NOT have the corresponding matching
460 // condition set.
461 //
462 OFMatch match = new OFMatch();
463 match.setWildcards(OFMatch.OFPFW_ALL);
464
465 // Match the Incoming Port
466 Short matchInPort = flowEntryObj.getMatchInPort();
467 if (matchInPort != null) {
468 match.setInputPort(matchInPort);
469 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
470 }
471
472 // Match the Source MAC address
473 String matchSrcMac = flowEntryObj.getMatchSrcMac();
474 if (matchSrcMac == null)
475 matchSrcMac = flowObj.getMatchSrcMac();
476 if (matchSrcMac != null) {
477 match.setDataLayerSource(matchSrcMac);
478 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
479 }
480
481 // Match the Destination MAC address
482 String matchDstMac = flowEntryObj.getMatchDstMac();
483 if (matchDstMac == null)
484 matchDstMac = flowObj.getMatchDstMac();
485 if (matchDstMac != null) {
486 match.setDataLayerDestination(matchDstMac);
487 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
488 }
489
490 // Match the Ethernet Frame Type
491 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
492 if (matchEthernetFrameType == null)
493 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
494 if (matchEthernetFrameType != null) {
495 match.setDataLayerType(matchEthernetFrameType);
496 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
497 }
498
499 // Match the VLAN ID
500 Short matchVlanId = flowEntryObj.getMatchVlanId();
501 if (matchVlanId == null)
502 matchVlanId = flowObj.getMatchVlanId();
503 if (matchVlanId != null) {
504 match.setDataLayerVirtualLan(matchVlanId);
505 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
506 }
507
508 // Match the VLAN priority
509 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
510 if (matchVlanPriority == null)
511 matchVlanPriority = flowObj.getMatchVlanPriority();
512 if (matchVlanPriority != null) {
513 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
514 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
515 }
516
517 // Match the Source IPv4 Network prefix
518 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
519 if (matchSrcIPv4Net == null)
520 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
521 if (matchSrcIPv4Net != null) {
522 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
523 }
524
525 // Match the Destination IPv4 Network prefix
526 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
527 if (matchDstIPv4Net == null)
528 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
529 if (matchDstIPv4Net != null) {
530 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
531 }
532
533 // Match the IP protocol
534 Byte matchIpProto = flowEntryObj.getMatchIpProto();
535 if (matchIpProto == null)
536 matchIpProto = flowObj.getMatchIpProto();
537 if (matchIpProto != null) {
538 match.setNetworkProtocol(matchIpProto);
539 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
540 }
541
542 // Match the IP ToS (DSCP field, 6 bits)
543 Byte matchIpToS = flowEntryObj.getMatchIpToS();
544 if (matchIpToS == null)
545 matchIpToS = flowObj.getMatchIpToS();
546 if (matchIpToS != null) {
547 match.setNetworkTypeOfService(matchIpToS);
548 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
549 }
550
551 // Match the Source TCP/UDP port
552 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
553 if (matchSrcTcpUdpPort == null)
554 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
555 if (matchSrcTcpUdpPort != null) {
556 match.setTransportSource(matchSrcTcpUdpPort);
557 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
558 }
559
560 // Match the Destination TCP/UDP port
561 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
562 if (matchDstTcpUdpPort == null)
563 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
564 if (matchDstTcpUdpPort != null) {
565 match.setTransportDestination(matchDstTcpUdpPort);
566 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
567 }
568
569 //
570 // Fetch the actions
571 //
572 Short actionOutputPort = null;
573 List<OFAction> openFlowActions = new ArrayList<OFAction>();
574 int actionsLen = 0;
575 FlowEntryActions flowEntryActions = null;
576 String actionsStr = flowEntryObj.getActions();
577 if (actionsStr != null)
578 flowEntryActions = new FlowEntryActions(actionsStr);
579 else
580 flowEntryActions = new FlowEntryActions();
581 for (FlowEntryAction action : flowEntryActions.actions()) {
582 ActionOutput actionOutput = action.actionOutput();
583 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
584 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
585 ActionStripVlan actionStripVlan = action.actionStripVlan();
586 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
587 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
588 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
589 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
590 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
591 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
592 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
593 ActionEnqueue actionEnqueue = action.actionEnqueue();
594
595 if (actionOutput != null) {
596 actionOutputPort = actionOutput.port().value();
597 // XXX: The max length is hard-coded for now
598 OFActionOutput ofa =
599 new OFActionOutput(actionOutput.port().value(),
600 (short)0xffff);
601 openFlowActions.add(ofa);
602 actionsLen += ofa.getLength();
603 }
604
605 if (actionSetVlanId != null) {
606 OFActionVirtualLanIdentifier ofa =
607 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
608 openFlowActions.add(ofa);
609 actionsLen += ofa.getLength();
610 }
611
612 if (actionSetVlanPriority != null) {
613 OFActionVirtualLanPriorityCodePoint ofa =
614 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
615 openFlowActions.add(ofa);
616 actionsLen += ofa.getLength();
617 }
618
619 if (actionStripVlan != null) {
620 if (actionStripVlan.stripVlan() == true) {
621 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
622 openFlowActions.add(ofa);
623 actionsLen += ofa.getLength();
624 }
625 }
626
627 if (actionSetEthernetSrcAddr != null) {
628 OFActionDataLayerSource ofa =
629 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
630 openFlowActions.add(ofa);
631 actionsLen += ofa.getLength();
632 }
633
634 if (actionSetEthernetDstAddr != null) {
635 OFActionDataLayerDestination ofa =
636 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
637 openFlowActions.add(ofa);
638 actionsLen += ofa.getLength();
639 }
640
641 if (actionSetIPv4SrcAddr != null) {
642 OFActionNetworkLayerSource ofa =
643 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
644 openFlowActions.add(ofa);
645 actionsLen += ofa.getLength();
646 }
647
648 if (actionSetIPv4DstAddr != null) {
649 OFActionNetworkLayerDestination ofa =
650 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
651 openFlowActions.add(ofa);
652 actionsLen += ofa.getLength();
653 }
654
655 if (actionSetIpToS != null) {
656 OFActionNetworkTypeOfService ofa =
657 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
658 openFlowActions.add(ofa);
659 actionsLen += ofa.getLength();
660 }
661
662 if (actionSetTcpUdpSrcPort != null) {
663 OFActionTransportLayerSource ofa =
664 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
665 openFlowActions.add(ofa);
666 actionsLen += ofa.getLength();
667 }
668
669 if (actionSetTcpUdpDstPort != null) {
670 OFActionTransportLayerDestination ofa =
671 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
672 openFlowActions.add(ofa);
673 actionsLen += ofa.getLength();
674 }
675
676 if (actionEnqueue != null) {
677 OFActionEnqueue ofa =
678 new OFActionEnqueue(actionEnqueue.port().value(),
679 actionEnqueue.queueId());
680 openFlowActions.add(ofa);
681 actionsLen += ofa.getLength();
682 }
683 }
684
685 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
686 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
687 .setPriority(PRIORITY_DEFAULT)
688 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
689 .setCookie(cookie)
690 .setCommand(flowModCommand)
691 .setMatch(match)
692 .setActions(openFlowActions)
693 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
694 fm.setOutPort(OFPort.OFPP_NONE.getValue());
695 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
696 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
697 if (actionOutputPort != null)
698 fm.setOutPort(actionOutputPort);
699 }
700
701 //
702 // TODO: Set the following flag
703 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
704 // See method ForwardingBase::pushRoute()
705 //
706
707 //
708 // Write the message to the switch
709 //
710 log.debug("MEASUREMENT: Installing flow entry " + userState +
711 " into switch DPID: " +
712 sw.getStringId() +
713 " flowEntryId: " + flowEntryId.toString() +
714 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
715 " inPort: " + matchInPort + " outPort: " + actionOutputPort
716 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800717 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800718 //
719 // TODO: We should use the OpenFlow Barrier mechanism
720 // to check for errors, and update the SwitchState
721 // for a flow entry after the Barrier message is
722 // is received.
723 //
724 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
725
726 return true;
727 }
728
Brian O'Connor8c166a72013-11-14 18:41:48 -0800729 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800730 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
731 //
732 // Create the OpenFlow Flow Modification Entry to push
733 //
734 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
735 long cookie = flowEntry.flowEntryId().value();
736
737 short flowModCommand = OFFlowMod.OFPFC_ADD;
738 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
739 flowModCommand = OFFlowMod.OFPFC_ADD;
740 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
741 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
742 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
743 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
744 } else {
745 // Unknown user state. Ignore the entry
746 log.debug(
747 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
748 flowEntry.flowEntryId().toString(),
749 flowEntry.flowEntryUserState());
750 return false;
751 }
752
753 //
754 // Fetch the match conditions.
755 //
756 // NOTE: The Flow matching conditions common for all Flow Entries are
757 // used ONLY if a Flow Entry does NOT have the corresponding matching
758 // condition set.
759 //
760 OFMatch match = new OFMatch();
761 match.setWildcards(OFMatch.OFPFW_ALL);
762 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
763 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
764
765 // Match the Incoming Port
766 Port matchInPort = flowEntryMatch.inPort();
767 if (matchInPort != null) {
768 match.setInputPort(matchInPort.value());
769 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
770 }
771
772 // Match the Source MAC address
773 MACAddress matchSrcMac = flowEntryMatch.srcMac();
774 if ((matchSrcMac == null) && (flowPathMatch != null)) {
775 matchSrcMac = flowPathMatch.srcMac();
776 }
777 if (matchSrcMac != null) {
778 match.setDataLayerSource(matchSrcMac.toString());
779 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
780 }
781
782 // Match the Destination MAC address
783 MACAddress matchDstMac = flowEntryMatch.dstMac();
784 if ((matchDstMac == null) && (flowPathMatch != null)) {
785 matchDstMac = flowPathMatch.dstMac();
786 }
787 if (matchDstMac != null) {
788 match.setDataLayerDestination(matchDstMac.toString());
789 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
790 }
791
792 // Match the Ethernet Frame Type
793 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
794 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
795 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
796 }
797 if (matchEthernetFrameType != null) {
798 match.setDataLayerType(matchEthernetFrameType);
799 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
800 }
801
802 // Match the VLAN ID
803 Short matchVlanId = flowEntryMatch.vlanId();
804 if ((matchVlanId == null) && (flowPathMatch != null)) {
805 matchVlanId = flowPathMatch.vlanId();
806 }
807 if (matchVlanId != null) {
808 match.setDataLayerVirtualLan(matchVlanId);
809 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
810 }
811
812 // Match the VLAN priority
813 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
814 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
815 matchVlanPriority = flowPathMatch.vlanPriority();
816 }
817 if (matchVlanPriority != null) {
818 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
819 match.setWildcards(match.getWildcards()
820 & ~OFMatch.OFPFW_DL_VLAN_PCP);
821 }
822
823 // Match the Source IPv4 Network prefix
824 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
825 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
826 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
827 }
828 if (matchSrcIPv4Net != null) {
829 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
830 }
831
832 // Natch the Destination IPv4 Network prefix
833 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
834 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
835 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
836 }
837 if (matchDstIPv4Net != null) {
838 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
839 }
840
841 // Match the IP protocol
842 Byte matchIpProto = flowEntryMatch.ipProto();
843 if ((matchIpProto == null) && (flowPathMatch != null)) {
844 matchIpProto = flowPathMatch.ipProto();
845 }
846 if (matchIpProto != null) {
847 match.setNetworkProtocol(matchIpProto);
848 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
849 }
850
851 // Match the IP ToS (DSCP field, 6 bits)
852 Byte matchIpToS = flowEntryMatch.ipToS();
853 if ((matchIpToS == null) && (flowPathMatch != null)) {
854 matchIpToS = flowPathMatch.ipToS();
855 }
856 if (matchIpToS != null) {
857 match.setNetworkTypeOfService(matchIpToS);
858 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
859 }
860
861 // Match the Source TCP/UDP port
862 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
863 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
864 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
865 }
866 if (matchSrcTcpUdpPort != null) {
867 match.setTransportSource(matchSrcTcpUdpPort);
868 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
869 }
870
871 // Match the Destination TCP/UDP port
872 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
873 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
874 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
875 }
876 if (matchDstTcpUdpPort != null) {
877 match.setTransportDestination(matchDstTcpUdpPort);
878 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
879 }
880
881 //
882 // Fetch the actions
883 //
884 Short actionOutputPort = null;
885 List<OFAction> openFlowActions = new ArrayList<OFAction>();
886 int actionsLen = 0;
887 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
888 //
889 for (FlowEntryAction action : flowEntryActions.actions()) {
890 ActionOutput actionOutput = action.actionOutput();
891 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
892 ActionSetVlanPriority actionSetVlanPriority = action
893 .actionSetVlanPriority();
894 ActionStripVlan actionStripVlan = action.actionStripVlan();
895 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
896 .actionSetEthernetSrcAddr();
897 ActionSetEthernetAddr actionSetEthernetDstAddr = action
898 .actionSetEthernetDstAddr();
899 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
900 .actionSetIPv4SrcAddr();
901 ActionSetIPv4Addr actionSetIPv4DstAddr = action
902 .actionSetIPv4DstAddr();
903 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
904 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
905 .actionSetTcpUdpSrcPort();
906 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
907 .actionSetTcpUdpDstPort();
908 ActionEnqueue actionEnqueue = action.actionEnqueue();
909
910 if (actionOutput != null) {
911 actionOutputPort = actionOutput.port().value();
912 // XXX: The max length is hard-coded for now
913 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
914 .value(), (short) 0xffff);
915 openFlowActions.add(ofa);
916 actionsLen += ofa.getLength();
917 }
918
919 if (actionSetVlanId != null) {
920 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
921 actionSetVlanId.vlanId());
922 openFlowActions.add(ofa);
923 actionsLen += ofa.getLength();
924 }
925
926 if (actionSetVlanPriority != null) {
927 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
928 actionSetVlanPriority.vlanPriority());
929 openFlowActions.add(ofa);
930 actionsLen += ofa.getLength();
931 }
932
933 if (actionStripVlan != null) {
934 if (actionStripVlan.stripVlan() == true) {
935 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
936 openFlowActions.add(ofa);
937 actionsLen += ofa.getLength();
938 }
939 }
940
941 if (actionSetEthernetSrcAddr != null) {
942 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
943 actionSetEthernetSrcAddr.addr().toBytes());
944 openFlowActions.add(ofa);
945 actionsLen += ofa.getLength();
946 }
947
948 if (actionSetEthernetDstAddr != null) {
949 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
950 actionSetEthernetDstAddr.addr().toBytes());
951 openFlowActions.add(ofa);
952 actionsLen += ofa.getLength();
953 }
954
955 if (actionSetIPv4SrcAddr != null) {
956 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
957 actionSetIPv4SrcAddr.addr().value());
958 openFlowActions.add(ofa);
959 actionsLen += ofa.getLength();
960 }
961
962 if (actionSetIPv4DstAddr != null) {
963 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
964 actionSetIPv4DstAddr.addr().value());
965 openFlowActions.add(ofa);
966 actionsLen += ofa.getLength();
967 }
968
969 if (actionSetIpToS != null) {
970 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
971 actionSetIpToS.ipToS());
972 openFlowActions.add(ofa);
973 actionsLen += ofa.getLength();
974 }
975
976 if (actionSetTcpUdpSrcPort != null) {
977 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
978 actionSetTcpUdpSrcPort.port());
979 openFlowActions.add(ofa);
980 actionsLen += ofa.getLength();
981 }
982
983 if (actionSetTcpUdpDstPort != null) {
984 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
985 actionSetTcpUdpDstPort.port());
986 openFlowActions.add(ofa);
987 actionsLen += ofa.getLength();
988 }
989
990 if (actionEnqueue != null) {
991 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
992 .value(), actionEnqueue.queueId());
993 openFlowActions.add(ofa);
994 actionsLen += ofa.getLength();
995 }
996 }
997
998 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
999 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
1000 .setPriority(PRIORITY_DEFAULT)
1001 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
1002 .setCommand(flowModCommand).setMatch(match)
1003 .setActions(openFlowActions)
1004 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
1005 fm.setOutPort(OFPort.OFPP_NONE.getValue());
1006 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
1007 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
1008 if (actionOutputPort != null)
1009 fm.setOutPort(actionOutputPort);
1010 }
1011
1012 //
1013 // TODO: Set the following flag
1014 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
1015 // See method ForwardingBase::pushRoute()
1016 //
1017
1018 //
1019 // Write the message to the switch
1020 //
1021 log.debug("MEASUREMENT: Installing flow entry "
1022 + flowEntry.flowEntryUserState() + " into switch DPID: "
1023 + sw.getStringId() + " flowEntryId: "
1024 + flowEntry.flowEntryId().toString() + " srcMac: "
1025 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
1026 + matchInPort + " outPort: " + actionOutputPort);
1027
1028 //
1029 // TODO: We should use the OpenFlow Barrier mechanism
1030 // to check for errors, and update the SwitchState
1031 // for a flow entry after the Barrier message is
1032 // is received.
1033 //
1034 // TODO: The FlowEntry Object in Titan should be set
1035 // to FE_SWITCH_UPDATED.
1036 //
1037
1038 return add(sw,fm);
1039 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001040
1041 @Override
1042 public OFBarrierReply barrier(IOFSwitch sw) {
1043 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
1044 if (future == null) {
1045 return null;
1046 }
1047
1048 try {
1049 return future.get();
1050 } catch (InterruptedException e) {
1051 e.printStackTrace();
1052 log.error("InterruptedException: {}", e);
1053 return null;
1054 } catch (ExecutionException e) {
1055 e.printStackTrace();
1056 log.error("ExecutionException: {}", e);
1057 return null;
1058 }
1059 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -08001060
Naoki Shiotac1601d32013-11-20 10:47:34 -08001061 @Override
1062 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1063 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -08001064
1065 if (sw == null) {
1066 return null;
1067 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001068
Naoki Shiota81dbe302013-11-21 15:35:38 -08001069 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001070 msg.setXid(sw.getNextTransactionId());
1071 add(sw, msg);
1072
1073 // TODO create Future object of message
1074 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiota81dbe302013-11-21 15:35:38 -08001075
Naoki Shiotac1601d32013-11-20 10:47:34 -08001076 synchronized (barrierFutures) {
1077 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1078 if (map == null) {
1079 map = new HashMap<Integer,OFBarrierReplyFuture>();
1080 barrierFutures.put(sw.getId(), map);
1081 }
1082 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001083 }
1084
1085 return future;
1086 }
1087
Naoki Shiotae3199732013-11-25 16:14:43 -08001088 /**
1089 * Get a queue attached to a switch.
1090 * @param sw Switch object
1091 * @return Queue object
1092 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001093 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001094 if (sw == null) {
1095 return null;
1096 }
1097
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001098 return getProcess(sw).queues.get(sw);
1099 }
1100
Naoki Shiotae3199732013-11-25 16:14:43 -08001101 /**
1102 * Get a hash value correspondent to a switch.
1103 * @param sw Switch object
1104 * @return Hash value
1105 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001106 protected long getHash(IOFSwitch sw) {
1107 // This code assumes DPID is sequentially assigned.
1108 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001109 return sw.getId() % number_thread;
1110 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001111
1112 /**
1113 * Get a Thread object which processes the queue attached to a switch.
1114 * @param sw Switch object
1115 * @return Thread object
1116 */
Naoki Shiota81dbe302013-11-21 15:35:38 -08001117 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001118 long hash = getHash(sw);
1119
1120 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001121 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001122
1123 @Override
1124 public String getName() {
1125 return "flowpusher";
1126 }
1127
1128 @Override
1129 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1130 return false;
1131 }
1132
1133 @Override
1134 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1135 return false;
1136 }
1137
1138 @Override
1139 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -08001140 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1141 if (map == null) {
1142 return Command.CONTINUE;
1143 }
1144
1145 OFBarrierReplyFuture future = map.get(msg.getXid());
1146 if (future == null) {
1147 return Command.CONTINUE;
1148 }
1149
1150 log.debug("Received BARRIER_REPLY : {}", msg);
1151 future.deliverFuture(sw, msg);
1152
1153 return Command.CONTINUE;
1154 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001155
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001156}