blob: e4aa3197f944e4ec4db48b0879a87a7cebce10a4 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08008import java.util.HashSet;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08009import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070010import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080011import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080012import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080013import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070014
Naoki Shiota7d0cf272013-11-05 10:18:12 -080015import org.openflow.protocol.*;
16import org.openflow.protocol.action.*;
17import org.openflow.protocol.factory.BasicFactory;
18import org.slf4j.Logger;
19import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
21import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080022import net.floodlightcontroller.core.IFloodlightProviderService;
23import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070024import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080025import net.floodlightcontroller.core.internal.OFMessageFuture;
26import net.floodlightcontroller.core.module.FloodlightModuleContext;
27import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080028import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080029import net.floodlightcontroller.util.OFMessageDamper;
30import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
31import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
32import net.onrc.onos.ofcontroller.util.FlowEntryAction;
33import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080034import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080035import net.onrc.onos.ofcontroller.util.FlowEntryActions;
36import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080037import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
38import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
39import net.onrc.onos.ofcontroller.util.FlowPath;
40import net.onrc.onos.ofcontroller.util.IPv4Net;
41import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070042
43/**
Naoki Shiotab485d412013-11-26 12:04:19 -080044 * FlowPusher is a implementation of FlowPusherService.
45 * FlowPusher assigns one message queue instance for each one switch.
46 * Number of message processing threads is configurable by constructor, and
47 * one thread can handle multiple message queues. Each queue will be assigned to
48 * a thread according to hash function defined by getHash().
49 * Each processing thread reads messages from queues and sends it to switches
50 * in round-robin. Processing thread also calculates rate of sending to suppress
51 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070052 * @author Naoki Shiota
53 *
54 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080055public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080056 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
57
58 // NOTE: Below are moved from FlowManager.
59 // TODO: Values copied from elsewhere (class LearningSwitch).
60 // The local copy should go away!
61 //
62 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
63 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080064
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080065 // Number of messages sent to switch at once
66 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080067
68 public static final short PRIORITY_DEFAULT = 100;
69 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
70 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
71
72 public enum QueueState {
73 READY,
74 SUSPENDED,
75 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070076
Naoki Shiotac1601d32013-11-20 10:47:34 -080077 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080078 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080079 * This consists of queue itself and variables used for limiting sending rate.
80 * @author Naoki Shiota
81 *
82 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080083 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080084 private class SwitchQueue extends ArrayDeque<OFMessage> {
85 QueueState state;
86
Naoki Shiotae3199732013-11-25 16:14:43 -080087 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080088 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070089 long last_sent_time = 0;
90 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080091
Naoki Shiotae3199732013-11-25 16:14:43 -080092 // "To be deleted" flag
93 boolean toBeDeleted = false;
94
Naoki Shiota7d0cf272013-11-05 10:18:12 -080095 /**
96 * Check if sending rate is within the rate
97 * @param current Current time
98 * @return true if within the rate
99 */
100 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800101 if (max_rate == 0) {
102 // no limitation
103 return true;
104 }
105
Naoki Shiota81dbe302013-11-21 15:35:38 -0800106 if (current == last_sent_time) {
107 return false;
108 }
109
Naoki Shiotac1601d32013-11-20 10:47:34 -0800110 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800111 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800112 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800113 }
114
Naoki Shiota81dbe302013-11-21 15:35:38 -0800115 /**
116 * Log time and size of last sent data.
117 * @param current Time to be sent.
118 * @param size Size of sent data (in bytes).
119 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800120 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800121 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800122 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800123 }
124
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700125 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800126
Naoki Shiotac1601d32013-11-20 10:47:34 -0800127 private OFMessageDamper messageDamper = null;
128 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700129
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800130 private FloodlightContext context = null;
131 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800132
133 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800134 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800135 // Map of Future objects versus dpid and transaction ID.
Naoki Shiotac1601d32013-11-20 10:47:34 -0800136 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
137 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800138
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800139 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800140
Naoki Shiota8739faa2013-11-18 17:00:25 -0800141 /**
142 * Main thread that reads messages from queues and sends them to switches.
143 * @author Naoki Shiota
144 *
145 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800146 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800147 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotab485d412013-11-26 12:04:19 -0800148 = new HashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800149
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800150 // reusable latch used for waiting for arrival of message
Naoki Shiota81dbe302013-11-21 15:35:38 -0800151 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800152
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700153 @Override
154 public void run() {
155 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800156 try {
157 // wait for message pushed to queue
158 mutex.acquire();
159 } catch (InterruptedException e) {
160 e.printStackTrace();
161 log.debug("FlowPusherThread is interrupted");
162 return;
163 }
164
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800165 // for safety of concurrent access, copy all key objects
166 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800167 synchronized (queues) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800168 for (IOFSwitch sw : queues.keySet()) {
169 keys.add(sw);
170 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800171 }
172
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800173 for (IOFSwitch sw : keys) {
174 SwitchQueue queue = queues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800175
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700176 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800177 if (sw == null || queue == null ||
178 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700179 continue;
180 }
181
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800182 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800183 processQueue(sw, queue, MAX_MESSAGE_SEND);
184 if (queue.isEmpty()) {
185 // remove queue if flagged to be.
186 if (queue.toBeDeleted) {
187 synchronized (queues) {
188 queues.remove(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800189 }
190 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800191 } else {
192 // if some messages remains in queue, latch down
193 if (mutex.availablePermits() == 0) {
194 mutex.release();
Naoki Shiota81dbe302013-11-21 15:35:38 -0800195 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700196 }
197 }
198 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700199 }
200 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800201
202 /**
203 * Read messages from queue and send them to the switch.
204 * If number of messages excess the limit, stop sending messages.
205 * @param sw Switch to which messages will be sent.
206 * @param queue Queue of messages.
207 * @param max_msg Limitation of number of messages to be sent. If set to 0,
208 * all messages in queue will be sent.
209 */
210 private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
211 // check sending rate and determine it to be sent or not
212 long current_time = System.currentTimeMillis();
213 long size = 0;
214
215 if (queue.isSendable(current_time)) {
216 int i = 0;
217 while (! queue.isEmpty()) {
218 // Number of messages excess the limit
219 if (0 < max_msg && max_msg <= i) {
220 break;
221 }
222 ++i;
223
224 OFMessage msg = queue.poll();
225 try {
226 messageDamper.write(sw, msg, context);
227 log.debug("Pusher sends message : {}", msg);
228 size += msg.getLength();
229 } catch (IOException e) {
230 e.printStackTrace();
231 log.error("Exception in sending message ({}) : {}", msg, e);
232 }
233 }
234 sw.flush();
235 queue.logSentData(current_time, size);
236 }
237 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700238 }
239
Naoki Shiotac1601d32013-11-20 10:47:34 -0800240 /**
241 * Initialize object with one thread.
242 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800243 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800244 }
245
Naoki Shiotac1601d32013-11-20 10:47:34 -0800246 /**
247 * Initialize object with threads of given number.
248 * @param number_thread Number of threads to handle messages.
249 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800250 public FlowPusher(int number_thread) {
251 this.number_thread = number_thread;
252 }
253
Naoki Shiotac1601d32013-11-20 10:47:34 -0800254 /**
255 * Set parameters needed for sending messages.
256 * @param context FloodlightContext used for sending messages.
257 * If null, FlowPusher uses default context.
258 * @param modContext FloodlightModuleContext used for acquiring
259 * ThreadPoolService and registering MessageListener.
260 * @param factory Factory object to create OFMessage objects.
261 * @param damper Message damper used for sending messages.
262 * If null, FlowPusher creates its own damper object.
263 */
264 public void init(FloodlightContext context,
265 FloodlightModuleContext modContext,
266 BasicFactory factory,
267 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700268 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800269 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800270 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
271 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
272 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800273
274 if (damper != null) {
275 messageDamper = damper;
276 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800277 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800278 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
279 EnumSet.of(OFType.FLOW_MOD),
280 OFMESSAGE_DAMPER_TIMEOUT);
281 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700282 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800283
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800284 /**
285 * Begin processing queue.
286 */
287 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800288 if (factory == null) {
289 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800290 return;
291 }
292
Naoki Shiota81dbe302013-11-21 15:35:38 -0800293 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800294 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800295 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800296
Naoki Shiota81dbe302013-11-21 15:35:38 -0800297 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800298 thread.start();
299 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700300 }
301
Brian O'Connor8c166a72013-11-14 18:41:48 -0800302 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800303 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800304 SwitchQueue queue = getQueue(sw);
305
306 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800307 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800308 }
309
310 synchronized (queue) {
311 if (queue.state == QueueState.READY) {
312 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800313 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800314 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800315 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800316 }
317 }
318
Brian O'Connor8c166a72013-11-14 18:41:48 -0800319 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800320 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800321 SwitchQueue queue = getQueue(sw);
322
323 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800324 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800325 }
326
327 synchronized (queue) {
328 if (queue.state == QueueState.SUSPENDED) {
329 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800330 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800331 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800332 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800333 }
334 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800335
Brian O'Connor8c166a72013-11-14 18:41:48 -0800336 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800337 public boolean isSuspended(IOFSwitch sw) {
338 SwitchQueue queue = getQueue(sw);
339
340 if (queue == null) {
341 // TODO Is true suitable for this case?
342 return true;
343 }
344
345 return (queue.state == QueueState.SUSPENDED);
346 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800347
348 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800349 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800350 */
351 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800352 if (threadMap == null) {
353 return;
354 }
355
Naoki Shiota81dbe302013-11-21 15:35:38 -0800356 for (FlowPusherThread t : threadMap.values()) {
357 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800358 }
359 }
360
Naoki Shiotae3199732013-11-25 16:14:43 -0800361 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800362 public void setRate(IOFSwitch sw, long rate) {
363 SwitchQueue queue = getQueue(sw);
364 if (queue == null) {
365 return;
366 }
367
368 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800369 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800370 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700371 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700372 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800373
374 @Override
375 public boolean createQueue(IOFSwitch sw) {
376 SwitchQueue queue = getQueue(sw);
377 if (queue != null) {
378 return false;
379 }
380
381 FlowPusherThread proc = getProcess(sw);
382 queue = new SwitchQueue();
383 queue.state = QueueState.READY;
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800384 synchronized (proc.queues) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800385 proc.queues.put(sw, queue);
386 }
387
388 return true;
389 }
390
391 @Override
392 public boolean deleteQueue(IOFSwitch sw) {
393 return deleteQueue(sw, false);
394 }
395
396 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800397 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800398 FlowPusherThread proc = getProcess(sw);
399
Naoki Shiotab485d412013-11-26 12:04:19 -0800400 if (forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800401 synchronized (proc.queues) {
402 SwitchQueue queue = proc.queues.remove(sw);
403 if (queue == null) {
404 return false;
405 }
406 }
407 return true;
408 } else {
409 SwitchQueue queue = getQueue(sw);
410 if (queue == null) {
411 return false;
412 }
413 synchronized (queue) {
414 queue.toBeDeleted = true;
415 }
416 return true;
417 }
418 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700419
Brian O'Connor8c166a72013-11-14 18:41:48 -0800420 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800421 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800422 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800423 SwitchQueue queue = proc.queues.get(sw);
424
Naoki Shiotab485d412013-11-26 12:04:19 -0800425 // create queue at first addition of message
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800426 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800427 createQueue(sw);
428 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800429 }
430
431 synchronized (queue) {
432 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800433 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800434 }
435
Naoki Shiota81dbe302013-11-21 15:35:38 -0800436 if (proc.mutex.availablePermits() == 0) {
437 proc.mutex.release();
438 }
439
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800440 return true;
441 }
442
Brian O'Connor8c166a72013-11-14 18:41:48 -0800443 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800444 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800445 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800446 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
447 if (flowEntryIdStr == null)
448 return false;
449 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
450 String userState = flowEntryObj.getUserState();
451 if (userState == null)
452 return false;
453
454 //
455 // Create the Open Flow Flow Modification Entry to push
456 //
457 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
458 long cookie = flowEntryId.value();
459
460 short flowModCommand = OFFlowMod.OFPFC_ADD;
461 if (userState.equals("FE_USER_ADD")) {
462 flowModCommand = OFFlowMod.OFPFC_ADD;
463 } else if (userState.equals("FE_USER_MODIFY")) {
464 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
465 } else if (userState.equals("FE_USER_DELETE")) {
466 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
467 } else {
468 // Unknown user state. Ignore the entry
469 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
470 flowEntryId.toString(), userState);
471 return false;
472 }
473
474 //
475 // Fetch the match conditions.
476 //
477 // NOTE: The Flow matching conditions common for all Flow Entries are
478 // used ONLY if a Flow Entry does NOT have the corresponding matching
479 // condition set.
480 //
481 OFMatch match = new OFMatch();
482 match.setWildcards(OFMatch.OFPFW_ALL);
483
484 // Match the Incoming Port
485 Short matchInPort = flowEntryObj.getMatchInPort();
486 if (matchInPort != null) {
487 match.setInputPort(matchInPort);
488 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
489 }
490
491 // Match the Source MAC address
492 String matchSrcMac = flowEntryObj.getMatchSrcMac();
493 if (matchSrcMac == null)
494 matchSrcMac = flowObj.getMatchSrcMac();
495 if (matchSrcMac != null) {
496 match.setDataLayerSource(matchSrcMac);
497 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
498 }
499
500 // Match the Destination MAC address
501 String matchDstMac = flowEntryObj.getMatchDstMac();
502 if (matchDstMac == null)
503 matchDstMac = flowObj.getMatchDstMac();
504 if (matchDstMac != null) {
505 match.setDataLayerDestination(matchDstMac);
506 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
507 }
508
509 // Match the Ethernet Frame Type
510 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
511 if (matchEthernetFrameType == null)
512 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
513 if (matchEthernetFrameType != null) {
514 match.setDataLayerType(matchEthernetFrameType);
515 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
516 }
517
518 // Match the VLAN ID
519 Short matchVlanId = flowEntryObj.getMatchVlanId();
520 if (matchVlanId == null)
521 matchVlanId = flowObj.getMatchVlanId();
522 if (matchVlanId != null) {
523 match.setDataLayerVirtualLan(matchVlanId);
524 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
525 }
526
527 // Match the VLAN priority
528 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
529 if (matchVlanPriority == null)
530 matchVlanPriority = flowObj.getMatchVlanPriority();
531 if (matchVlanPriority != null) {
532 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
533 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
534 }
535
536 // Match the Source IPv4 Network prefix
537 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
538 if (matchSrcIPv4Net == null)
539 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
540 if (matchSrcIPv4Net != null) {
541 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
542 }
543
544 // Match the Destination IPv4 Network prefix
545 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
546 if (matchDstIPv4Net == null)
547 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
548 if (matchDstIPv4Net != null) {
549 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
550 }
551
552 // Match the IP protocol
553 Byte matchIpProto = flowEntryObj.getMatchIpProto();
554 if (matchIpProto == null)
555 matchIpProto = flowObj.getMatchIpProto();
556 if (matchIpProto != null) {
557 match.setNetworkProtocol(matchIpProto);
558 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
559 }
560
561 // Match the IP ToS (DSCP field, 6 bits)
562 Byte matchIpToS = flowEntryObj.getMatchIpToS();
563 if (matchIpToS == null)
564 matchIpToS = flowObj.getMatchIpToS();
565 if (matchIpToS != null) {
566 match.setNetworkTypeOfService(matchIpToS);
567 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
568 }
569
570 // Match the Source TCP/UDP port
571 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
572 if (matchSrcTcpUdpPort == null)
573 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
574 if (matchSrcTcpUdpPort != null) {
575 match.setTransportSource(matchSrcTcpUdpPort);
576 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
577 }
578
579 // Match the Destination TCP/UDP port
580 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
581 if (matchDstTcpUdpPort == null)
582 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
583 if (matchDstTcpUdpPort != null) {
584 match.setTransportDestination(matchDstTcpUdpPort);
585 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
586 }
587
588 //
589 // Fetch the actions
590 //
591 Short actionOutputPort = null;
592 List<OFAction> openFlowActions = new ArrayList<OFAction>();
593 int actionsLen = 0;
594 FlowEntryActions flowEntryActions = null;
595 String actionsStr = flowEntryObj.getActions();
596 if (actionsStr != null)
597 flowEntryActions = new FlowEntryActions(actionsStr);
598 else
599 flowEntryActions = new FlowEntryActions();
600 for (FlowEntryAction action : flowEntryActions.actions()) {
601 ActionOutput actionOutput = action.actionOutput();
602 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
603 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
604 ActionStripVlan actionStripVlan = action.actionStripVlan();
605 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
606 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
607 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
608 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
609 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
610 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
611 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
612 ActionEnqueue actionEnqueue = action.actionEnqueue();
613
614 if (actionOutput != null) {
615 actionOutputPort = actionOutput.port().value();
616 // XXX: The max length is hard-coded for now
617 OFActionOutput ofa =
618 new OFActionOutput(actionOutput.port().value(),
619 (short)0xffff);
620 openFlowActions.add(ofa);
621 actionsLen += ofa.getLength();
622 }
623
624 if (actionSetVlanId != null) {
625 OFActionVirtualLanIdentifier ofa =
626 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
627 openFlowActions.add(ofa);
628 actionsLen += ofa.getLength();
629 }
630
631 if (actionSetVlanPriority != null) {
632 OFActionVirtualLanPriorityCodePoint ofa =
633 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
634 openFlowActions.add(ofa);
635 actionsLen += ofa.getLength();
636 }
637
638 if (actionStripVlan != null) {
639 if (actionStripVlan.stripVlan() == true) {
640 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
641 openFlowActions.add(ofa);
642 actionsLen += ofa.getLength();
643 }
644 }
645
646 if (actionSetEthernetSrcAddr != null) {
647 OFActionDataLayerSource ofa =
648 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
649 openFlowActions.add(ofa);
650 actionsLen += ofa.getLength();
651 }
652
653 if (actionSetEthernetDstAddr != null) {
654 OFActionDataLayerDestination ofa =
655 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
656 openFlowActions.add(ofa);
657 actionsLen += ofa.getLength();
658 }
659
660 if (actionSetIPv4SrcAddr != null) {
661 OFActionNetworkLayerSource ofa =
662 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
663 openFlowActions.add(ofa);
664 actionsLen += ofa.getLength();
665 }
666
667 if (actionSetIPv4DstAddr != null) {
668 OFActionNetworkLayerDestination ofa =
669 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
670 openFlowActions.add(ofa);
671 actionsLen += ofa.getLength();
672 }
673
674 if (actionSetIpToS != null) {
675 OFActionNetworkTypeOfService ofa =
676 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
677 openFlowActions.add(ofa);
678 actionsLen += ofa.getLength();
679 }
680
681 if (actionSetTcpUdpSrcPort != null) {
682 OFActionTransportLayerSource ofa =
683 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
684 openFlowActions.add(ofa);
685 actionsLen += ofa.getLength();
686 }
687
688 if (actionSetTcpUdpDstPort != null) {
689 OFActionTransportLayerDestination ofa =
690 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
691 openFlowActions.add(ofa);
692 actionsLen += ofa.getLength();
693 }
694
695 if (actionEnqueue != null) {
696 OFActionEnqueue ofa =
697 new OFActionEnqueue(actionEnqueue.port().value(),
698 actionEnqueue.queueId());
699 openFlowActions.add(ofa);
700 actionsLen += ofa.getLength();
701 }
702 }
703
704 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
705 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
706 .setPriority(PRIORITY_DEFAULT)
707 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
708 .setCookie(cookie)
709 .setCommand(flowModCommand)
710 .setMatch(match)
711 .setActions(openFlowActions)
712 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
713 fm.setOutPort(OFPort.OFPP_NONE.getValue());
714 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
715 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
716 if (actionOutputPort != null)
717 fm.setOutPort(actionOutputPort);
718 }
719
720 //
721 // TODO: Set the following flag
722 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
723 // See method ForwardingBase::pushRoute()
724 //
725
726 //
727 // Write the message to the switch
728 //
729 log.debug("MEASUREMENT: Installing flow entry " + userState +
730 " into switch DPID: " +
731 sw.getStringId() +
732 " flowEntryId: " + flowEntryId.toString() +
733 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
734 " inPort: " + matchInPort + " outPort: " + actionOutputPort
735 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800736 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800737 //
738 // TODO: We should use the OpenFlow Barrier mechanism
739 // to check for errors, and update the SwitchState
740 // for a flow entry after the Barrier message is
741 // is received.
742 //
743 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
744
745 return true;
746 }
747
Brian O'Connor8c166a72013-11-14 18:41:48 -0800748 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800749 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
750 //
751 // Create the OpenFlow Flow Modification Entry to push
752 //
753 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
754 long cookie = flowEntry.flowEntryId().value();
755
756 short flowModCommand = OFFlowMod.OFPFC_ADD;
757 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
758 flowModCommand = OFFlowMod.OFPFC_ADD;
759 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
760 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
761 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
762 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
763 } else {
764 // Unknown user state. Ignore the entry
765 log.debug(
766 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
767 flowEntry.flowEntryId().toString(),
768 flowEntry.flowEntryUserState());
769 return false;
770 }
771
772 //
773 // Fetch the match conditions.
774 //
775 // NOTE: The Flow matching conditions common for all Flow Entries are
776 // used ONLY if a Flow Entry does NOT have the corresponding matching
777 // condition set.
778 //
779 OFMatch match = new OFMatch();
780 match.setWildcards(OFMatch.OFPFW_ALL);
781 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
782 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
783
784 // Match the Incoming Port
785 Port matchInPort = flowEntryMatch.inPort();
786 if (matchInPort != null) {
787 match.setInputPort(matchInPort.value());
788 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
789 }
790
791 // Match the Source MAC address
792 MACAddress matchSrcMac = flowEntryMatch.srcMac();
793 if ((matchSrcMac == null) && (flowPathMatch != null)) {
794 matchSrcMac = flowPathMatch.srcMac();
795 }
796 if (matchSrcMac != null) {
797 match.setDataLayerSource(matchSrcMac.toString());
798 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
799 }
800
801 // Match the Destination MAC address
802 MACAddress matchDstMac = flowEntryMatch.dstMac();
803 if ((matchDstMac == null) && (flowPathMatch != null)) {
804 matchDstMac = flowPathMatch.dstMac();
805 }
806 if (matchDstMac != null) {
807 match.setDataLayerDestination(matchDstMac.toString());
808 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
809 }
810
811 // Match the Ethernet Frame Type
812 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
813 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
814 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
815 }
816 if (matchEthernetFrameType != null) {
817 match.setDataLayerType(matchEthernetFrameType);
818 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
819 }
820
821 // Match the VLAN ID
822 Short matchVlanId = flowEntryMatch.vlanId();
823 if ((matchVlanId == null) && (flowPathMatch != null)) {
824 matchVlanId = flowPathMatch.vlanId();
825 }
826 if (matchVlanId != null) {
827 match.setDataLayerVirtualLan(matchVlanId);
828 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
829 }
830
831 // Match the VLAN priority
832 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
833 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
834 matchVlanPriority = flowPathMatch.vlanPriority();
835 }
836 if (matchVlanPriority != null) {
837 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
838 match.setWildcards(match.getWildcards()
839 & ~OFMatch.OFPFW_DL_VLAN_PCP);
840 }
841
842 // Match the Source IPv4 Network prefix
843 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
844 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
845 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
846 }
847 if (matchSrcIPv4Net != null) {
848 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
849 }
850
851 // Natch the Destination IPv4 Network prefix
852 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
853 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
854 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
855 }
856 if (matchDstIPv4Net != null) {
857 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
858 }
859
860 // Match the IP protocol
861 Byte matchIpProto = flowEntryMatch.ipProto();
862 if ((matchIpProto == null) && (flowPathMatch != null)) {
863 matchIpProto = flowPathMatch.ipProto();
864 }
865 if (matchIpProto != null) {
866 match.setNetworkProtocol(matchIpProto);
867 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
868 }
869
870 // Match the IP ToS (DSCP field, 6 bits)
871 Byte matchIpToS = flowEntryMatch.ipToS();
872 if ((matchIpToS == null) && (flowPathMatch != null)) {
873 matchIpToS = flowPathMatch.ipToS();
874 }
875 if (matchIpToS != null) {
876 match.setNetworkTypeOfService(matchIpToS);
877 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
878 }
879
880 // Match the Source TCP/UDP port
881 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
882 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
883 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
884 }
885 if (matchSrcTcpUdpPort != null) {
886 match.setTransportSource(matchSrcTcpUdpPort);
887 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
888 }
889
890 // Match the Destination TCP/UDP port
891 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
892 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
893 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
894 }
895 if (matchDstTcpUdpPort != null) {
896 match.setTransportDestination(matchDstTcpUdpPort);
897 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
898 }
899
900 //
901 // Fetch the actions
902 //
903 Short actionOutputPort = null;
904 List<OFAction> openFlowActions = new ArrayList<OFAction>();
905 int actionsLen = 0;
906 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
907 //
908 for (FlowEntryAction action : flowEntryActions.actions()) {
909 ActionOutput actionOutput = action.actionOutput();
910 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
911 ActionSetVlanPriority actionSetVlanPriority = action
912 .actionSetVlanPriority();
913 ActionStripVlan actionStripVlan = action.actionStripVlan();
914 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
915 .actionSetEthernetSrcAddr();
916 ActionSetEthernetAddr actionSetEthernetDstAddr = action
917 .actionSetEthernetDstAddr();
918 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
919 .actionSetIPv4SrcAddr();
920 ActionSetIPv4Addr actionSetIPv4DstAddr = action
921 .actionSetIPv4DstAddr();
922 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
923 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
924 .actionSetTcpUdpSrcPort();
925 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
926 .actionSetTcpUdpDstPort();
927 ActionEnqueue actionEnqueue = action.actionEnqueue();
928
929 if (actionOutput != null) {
930 actionOutputPort = actionOutput.port().value();
931 // XXX: The max length is hard-coded for now
932 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
933 .value(), (short) 0xffff);
934 openFlowActions.add(ofa);
935 actionsLen += ofa.getLength();
936 }
937
938 if (actionSetVlanId != null) {
939 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
940 actionSetVlanId.vlanId());
941 openFlowActions.add(ofa);
942 actionsLen += ofa.getLength();
943 }
944
945 if (actionSetVlanPriority != null) {
946 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
947 actionSetVlanPriority.vlanPriority());
948 openFlowActions.add(ofa);
949 actionsLen += ofa.getLength();
950 }
951
952 if (actionStripVlan != null) {
953 if (actionStripVlan.stripVlan() == true) {
954 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
955 openFlowActions.add(ofa);
956 actionsLen += ofa.getLength();
957 }
958 }
959
960 if (actionSetEthernetSrcAddr != null) {
961 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
962 actionSetEthernetSrcAddr.addr().toBytes());
963 openFlowActions.add(ofa);
964 actionsLen += ofa.getLength();
965 }
966
967 if (actionSetEthernetDstAddr != null) {
968 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
969 actionSetEthernetDstAddr.addr().toBytes());
970 openFlowActions.add(ofa);
971 actionsLen += ofa.getLength();
972 }
973
974 if (actionSetIPv4SrcAddr != null) {
975 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
976 actionSetIPv4SrcAddr.addr().value());
977 openFlowActions.add(ofa);
978 actionsLen += ofa.getLength();
979 }
980
981 if (actionSetIPv4DstAddr != null) {
982 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
983 actionSetIPv4DstAddr.addr().value());
984 openFlowActions.add(ofa);
985 actionsLen += ofa.getLength();
986 }
987
988 if (actionSetIpToS != null) {
989 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
990 actionSetIpToS.ipToS());
991 openFlowActions.add(ofa);
992 actionsLen += ofa.getLength();
993 }
994
995 if (actionSetTcpUdpSrcPort != null) {
996 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
997 actionSetTcpUdpSrcPort.port());
998 openFlowActions.add(ofa);
999 actionsLen += ofa.getLength();
1000 }
1001
1002 if (actionSetTcpUdpDstPort != null) {
1003 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
1004 actionSetTcpUdpDstPort.port());
1005 openFlowActions.add(ofa);
1006 actionsLen += ofa.getLength();
1007 }
1008
1009 if (actionEnqueue != null) {
1010 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
1011 .value(), actionEnqueue.queueId());
1012 openFlowActions.add(ofa);
1013 actionsLen += ofa.getLength();
1014 }
1015 }
1016
1017 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
1018 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
1019 .setPriority(PRIORITY_DEFAULT)
1020 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
1021 .setCommand(flowModCommand).setMatch(match)
1022 .setActions(openFlowActions)
1023 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
1024 fm.setOutPort(OFPort.OFPP_NONE.getValue());
1025 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
1026 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
1027 if (actionOutputPort != null)
1028 fm.setOutPort(actionOutputPort);
1029 }
1030
1031 //
1032 // TODO: Set the following flag
1033 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
1034 // See method ForwardingBase::pushRoute()
1035 //
1036
1037 //
1038 // Write the message to the switch
1039 //
1040 log.debug("MEASUREMENT: Installing flow entry "
1041 + flowEntry.flowEntryUserState() + " into switch DPID: "
1042 + sw.getStringId() + " flowEntryId: "
1043 + flowEntry.flowEntryId().toString() + " srcMac: "
1044 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
1045 + matchInPort + " outPort: " + actionOutputPort);
1046
1047 //
1048 // TODO: We should use the OpenFlow Barrier mechanism
1049 // to check for errors, and update the SwitchState
1050 // for a flow entry after the Barrier message is
1051 // is received.
1052 //
1053 // TODO: The FlowEntry Object in Titan should be set
1054 // to FE_SWITCH_UPDATED.
1055 //
1056
1057 return add(sw,fm);
1058 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001059
1060 @Override
1061 public OFBarrierReply barrier(IOFSwitch sw) {
1062 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
1063 if (future == null) {
1064 return null;
1065 }
1066
1067 try {
1068 return future.get();
1069 } catch (InterruptedException e) {
1070 e.printStackTrace();
1071 log.error("InterruptedException: {}", e);
1072 return null;
1073 } catch (ExecutionException e) {
1074 e.printStackTrace();
1075 log.error("ExecutionException: {}", e);
1076 return null;
1077 }
1078 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -08001079
Naoki Shiotac1601d32013-11-20 10:47:34 -08001080 @Override
1081 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1082 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -08001083
1084 if (sw == null) {
1085 return null;
1086 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001087
Naoki Shiota81dbe302013-11-21 15:35:38 -08001088 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001089 msg.setXid(sw.getNextTransactionId());
Naoki Shiotac1601d32013-11-20 10:47:34 -08001090
Naoki Shiotac1601d32013-11-20 10:47:34 -08001091 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiotac1601d32013-11-20 10:47:34 -08001092 synchronized (barrierFutures) {
1093 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1094 if (map == null) {
1095 map = new HashMap<Integer,OFBarrierReplyFuture>();
1096 barrierFutures.put(sw.getId(), map);
1097 }
1098 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001099 }
1100
Naoki Shiotac2ec5592013-11-27 12:10:33 -08001101 add(sw, msg);
1102
Naoki Shiotac1601d32013-11-20 10:47:34 -08001103 return future;
1104 }
1105
Naoki Shiotae3199732013-11-25 16:14:43 -08001106 /**
1107 * Get a queue attached to a switch.
1108 * @param sw Switch object
1109 * @return Queue object
1110 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001111 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001112 if (sw == null) {
1113 return null;
1114 }
1115
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001116 return getProcess(sw).queues.get(sw);
1117 }
1118
Naoki Shiotae3199732013-11-25 16:14:43 -08001119 /**
1120 * Get a hash value correspondent to a switch.
1121 * @param sw Switch object
1122 * @return Hash value
1123 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001124 protected long getHash(IOFSwitch sw) {
1125 // This code assumes DPID is sequentially assigned.
1126 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001127 return sw.getId() % number_thread;
1128 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001129
1130 /**
1131 * Get a Thread object which processes the queue attached to a switch.
1132 * @param sw Switch object
1133 * @return Thread object
1134 */
Naoki Shiota81dbe302013-11-21 15:35:38 -08001135 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001136 long hash = getHash(sw);
1137
1138 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001139 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001140
1141 @Override
1142 public String getName() {
1143 return "flowpusher";
1144 }
1145
1146 @Override
1147 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1148 return false;
1149 }
1150
1151 @Override
1152 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1153 return false;
1154 }
1155
1156 @Override
1157 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -08001158 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1159 if (map == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -08001160 log.debug("null map for {} : {}", sw.getId(), barrierFutures);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001161 return Command.CONTINUE;
1162 }
1163
1164 OFBarrierReplyFuture future = map.get(msg.getXid());
1165 if (future == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -08001166 log.debug("null future for {} : {}", msg.getXid(), map);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001167 return Command.CONTINUE;
1168 }
1169
1170 log.debug("Received BARRIER_REPLY : {}", msg);
1171 future.deliverFuture(sw, msg);
1172
1173 return Command.CONTINUE;
1174 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001175
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001176}