blob: 349f69d4a4061015697c0d4c7702ae9350ac36ef [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08008import java.util.HashSet;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08009import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070010import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080011import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080012import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080013import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070014
Naoki Shiota7d0cf272013-11-05 10:18:12 -080015import org.openflow.protocol.*;
16import org.openflow.protocol.action.*;
17import org.openflow.protocol.factory.BasicFactory;
18import org.slf4j.Logger;
19import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
21import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080022import net.floodlightcontroller.core.IFloodlightProviderService;
23import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070024import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080025import net.floodlightcontroller.core.internal.OFMessageFuture;
26import net.floodlightcontroller.core.module.FloodlightModuleContext;
27import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080028import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080029import net.floodlightcontroller.util.OFMessageDamper;
30import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
31import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
32import net.onrc.onos.ofcontroller.util.FlowEntryAction;
33import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080034import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080035import net.onrc.onos.ofcontroller.util.FlowEntryActions;
36import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080037import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
38import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
39import net.onrc.onos.ofcontroller.util.FlowPath;
40import net.onrc.onos.ofcontroller.util.IPv4Net;
41import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070042
43/**
Naoki Shiotab485d412013-11-26 12:04:19 -080044 * FlowPusher is a implementation of FlowPusherService.
45 * FlowPusher assigns one message queue instance for each one switch.
46 * Number of message processing threads is configurable by constructor, and
47 * one thread can handle multiple message queues. Each queue will be assigned to
48 * a thread according to hash function defined by getHash().
49 * Each processing thread reads messages from queues and sends it to switches
50 * in round-robin. Processing thread also calculates rate of sending to suppress
51 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070052 * @author Naoki Shiota
53 *
54 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080055public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080056 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
57
58 // NOTE: Below are moved from FlowManager.
59 // TODO: Values copied from elsewhere (class LearningSwitch).
60 // The local copy should go away!
61 //
62 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
63 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080064
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080065 // Number of messages sent to switch at once
66 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080067
68 public static final short PRIORITY_DEFAULT = 100;
69 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
70 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
71
72 public enum QueueState {
73 READY,
74 SUSPENDED,
75 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070076
Naoki Shiotac1601d32013-11-20 10:47:34 -080077 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080078 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080079 * This consists of queue itself and variables used for limiting sending rate.
80 * @author Naoki Shiota
81 *
82 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080083 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080084 private class SwitchQueue extends ArrayDeque<OFMessage> {
85 QueueState state;
86
Naoki Shiotae3199732013-11-25 16:14:43 -080087 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080088 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070089 long last_sent_time = 0;
90 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080091
Naoki Shiotae3199732013-11-25 16:14:43 -080092 // "To be deleted" flag
93 boolean toBeDeleted = false;
94
Naoki Shiota7d0cf272013-11-05 10:18:12 -080095 /**
96 * Check if sending rate is within the rate
97 * @param current Current time
98 * @return true if within the rate
99 */
100 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800101 if (max_rate == 0) {
102 // no limitation
103 return true;
104 }
105
Naoki Shiota81dbe302013-11-21 15:35:38 -0800106 if (current == last_sent_time) {
107 return false;
108 }
109
Naoki Shiotac1601d32013-11-20 10:47:34 -0800110 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800111 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800112 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800113 }
114
Naoki Shiota81dbe302013-11-21 15:35:38 -0800115 /**
116 * Log time and size of last sent data.
117 * @param current Time to be sent.
118 * @param size Size of sent data (in bytes).
119 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800120 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800121 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800122 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800123 }
124
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700125 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800126
Naoki Shiotac1601d32013-11-20 10:47:34 -0800127 private OFMessageDamper messageDamper = null;
128 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700129
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800130 private FloodlightContext context = null;
131 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800132
133 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800134 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800135 // Map of Future objects versus dpid and transaction ID.
Naoki Shiotac1601d32013-11-20 10:47:34 -0800136 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
137 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800138
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800139 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800140
Naoki Shiota8739faa2013-11-18 17:00:25 -0800141 /**
142 * Main thread that reads messages from queues and sends them to switches.
143 * @author Naoki Shiota
144 *
145 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800146 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800147 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotab485d412013-11-26 12:04:19 -0800148 = new HashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800149
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800150 // reusable latch used for waiting for arrival of message
Naoki Shiota81dbe302013-11-21 15:35:38 -0800151 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800152
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700153 @Override
154 public void run() {
155 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800156 try {
157 // wait for message pushed to queue
158 mutex.acquire();
159 } catch (InterruptedException e) {
160 e.printStackTrace();
161 log.debug("FlowPusherThread is interrupted");
162 return;
163 }
164
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800165 // for safety of concurrent access, copy all key objects
166 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800167 synchronized (queues) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800168 for (IOFSwitch sw : queues.keySet()) {
169 keys.add(sw);
170 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800171 }
172
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800173 for (IOFSwitch sw : keys) {
174 SwitchQueue queue = queues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800175
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700176 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800177 if (sw == null || queue == null ||
178 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700179 continue;
180 }
181
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800182 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800183 processQueue(sw, queue, MAX_MESSAGE_SEND);
184 if (queue.isEmpty()) {
185 // remove queue if flagged to be.
186 if (queue.toBeDeleted) {
187 synchronized (queues) {
188 queues.remove(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800189 }
190 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800191 } else {
192 // if some messages remains in queue, latch down
193 if (mutex.availablePermits() == 0) {
194 mutex.release();
Naoki Shiota81dbe302013-11-21 15:35:38 -0800195 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700196 }
197 }
198 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700199 }
200 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800201
202 /**
203 * Read messages from queue and send them to the switch.
204 * If number of messages excess the limit, stop sending messages.
205 * @param sw Switch to which messages will be sent.
206 * @param queue Queue of messages.
207 * @param max_msg Limitation of number of messages to be sent. If set to 0,
208 * all messages in queue will be sent.
209 */
210 private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
211 // check sending rate and determine it to be sent or not
212 long current_time = System.currentTimeMillis();
213 long size = 0;
214
215 if (queue.isSendable(current_time)) {
216 int i = 0;
217 while (! queue.isEmpty()) {
218 // Number of messages excess the limit
219 if (0 < max_msg && max_msg <= i) {
220 break;
221 }
222 ++i;
223
224 OFMessage msg = queue.poll();
225 try {
226 messageDamper.write(sw, msg, context);
227 log.debug("Pusher sends message : {}", msg);
228 size += msg.getLength();
229 } catch (IOException e) {
230 e.printStackTrace();
231 log.error("Exception in sending message ({}) : {}", msg, e);
232 }
233 }
234 sw.flush();
235 queue.logSentData(current_time, size);
236 }
237 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700238 }
239
Naoki Shiotac1601d32013-11-20 10:47:34 -0800240 /**
241 * Initialize object with one thread.
242 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800243 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800244 }
245
Naoki Shiotac1601d32013-11-20 10:47:34 -0800246 /**
247 * Initialize object with threads of given number.
248 * @param number_thread Number of threads to handle messages.
249 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800250 public FlowPusher(int number_thread) {
251 this.number_thread = number_thread;
252 }
253
Naoki Shiotac1601d32013-11-20 10:47:34 -0800254 /**
255 * Set parameters needed for sending messages.
256 * @param context FloodlightContext used for sending messages.
257 * If null, FlowPusher uses default context.
258 * @param modContext FloodlightModuleContext used for acquiring
259 * ThreadPoolService and registering MessageListener.
260 * @param factory Factory object to create OFMessage objects.
261 * @param damper Message damper used for sending messages.
262 * If null, FlowPusher creates its own damper object.
263 */
264 public void init(FloodlightContext context,
265 FloodlightModuleContext modContext,
266 BasicFactory factory,
267 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700268 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800269 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800270 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
271 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
272 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800273
274 if (damper != null) {
275 messageDamper = damper;
276 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800277 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800278 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
279 EnumSet.of(OFType.FLOW_MOD),
280 OFMESSAGE_DAMPER_TIMEOUT);
281 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700282 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800283
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800284 /**
285 * Begin processing queue.
286 */
287 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800288 if (factory == null) {
289 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800290 return;
291 }
292
Naoki Shiota81dbe302013-11-21 15:35:38 -0800293 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800294 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800295 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800296
Naoki Shiota81dbe302013-11-21 15:35:38 -0800297 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800298 thread.start();
299 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700300 }
301
Brian O'Connor8c166a72013-11-14 18:41:48 -0800302 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800303 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800304 SwitchQueue queue = getQueue(sw);
305
306 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800307 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800308 }
309
310 synchronized (queue) {
311 if (queue.state == QueueState.READY) {
312 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800313 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800314 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800315 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800316 }
317 }
318
Brian O'Connor8c166a72013-11-14 18:41:48 -0800319 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800320 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800321 SwitchQueue queue = getQueue(sw);
322
323 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800324 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800325 }
326
327 synchronized (queue) {
328 if (queue.state == QueueState.SUSPENDED) {
329 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800330 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800331 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800332 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800333 }
334 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800335
Brian O'Connor8c166a72013-11-14 18:41:48 -0800336 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800337 public boolean isSuspended(IOFSwitch sw) {
338 SwitchQueue queue = getQueue(sw);
339
340 if (queue == null) {
341 // TODO Is true suitable for this case?
342 return true;
343 }
344
345 return (queue.state == QueueState.SUSPENDED);
346 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800347
348 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800349 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800350 */
351 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800352 if (threadMap == null) {
353 return;
354 }
355
Naoki Shiota81dbe302013-11-21 15:35:38 -0800356 for (FlowPusherThread t : threadMap.values()) {
357 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800358 }
359 }
360
Naoki Shiotae3199732013-11-25 16:14:43 -0800361 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800362 public void setRate(IOFSwitch sw, long rate) {
363 SwitchQueue queue = getQueue(sw);
364 if (queue == null) {
365 return;
366 }
367
368 if (rate > 0) {
369 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700370 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700371 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800372
373 @Override
374 public boolean createQueue(IOFSwitch sw) {
375 SwitchQueue queue = getQueue(sw);
376 if (queue != null) {
377 return false;
378 }
379
380 FlowPusherThread proc = getProcess(sw);
381 queue = new SwitchQueue();
382 queue.state = QueueState.READY;
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800383 synchronized (proc.queues) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800384 proc.queues.put(sw, queue);
385 }
386
387 return true;
388 }
389
390 @Override
391 public boolean deleteQueue(IOFSwitch sw) {
392 return deleteQueue(sw, false);
393 }
394
395 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800396 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800397 FlowPusherThread proc = getProcess(sw);
398
Naoki Shiotab485d412013-11-26 12:04:19 -0800399 if (forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800400 synchronized (proc.queues) {
401 SwitchQueue queue = proc.queues.remove(sw);
402 if (queue == null) {
403 return false;
404 }
405 }
406 return true;
407 } else {
408 SwitchQueue queue = getQueue(sw);
409 if (queue == null) {
410 return false;
411 }
412 synchronized (queue) {
413 queue.toBeDeleted = true;
414 }
415 return true;
416 }
417 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700418
Brian O'Connor8c166a72013-11-14 18:41:48 -0800419 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800420 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800421 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800422 SwitchQueue queue = proc.queues.get(sw);
423
Naoki Shiotab485d412013-11-26 12:04:19 -0800424 // create queue at first addition of message
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800425 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800426 createQueue(sw);
427 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800428 }
429
430 synchronized (queue) {
431 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800432 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800433 }
434
Naoki Shiota81dbe302013-11-21 15:35:38 -0800435 if (proc.mutex.availablePermits() == 0) {
436 proc.mutex.release();
437 }
438
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800439 return true;
440 }
441
Brian O'Connor8c166a72013-11-14 18:41:48 -0800442 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800443 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800444 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800445 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
446 if (flowEntryIdStr == null)
447 return false;
448 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
449 String userState = flowEntryObj.getUserState();
450 if (userState == null)
451 return false;
452
453 //
454 // Create the Open Flow Flow Modification Entry to push
455 //
456 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
457 long cookie = flowEntryId.value();
458
459 short flowModCommand = OFFlowMod.OFPFC_ADD;
460 if (userState.equals("FE_USER_ADD")) {
461 flowModCommand = OFFlowMod.OFPFC_ADD;
462 } else if (userState.equals("FE_USER_MODIFY")) {
463 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
464 } else if (userState.equals("FE_USER_DELETE")) {
465 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
466 } else {
467 // Unknown user state. Ignore the entry
468 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
469 flowEntryId.toString(), userState);
470 return false;
471 }
472
473 //
474 // Fetch the match conditions.
475 //
476 // NOTE: The Flow matching conditions common for all Flow Entries are
477 // used ONLY if a Flow Entry does NOT have the corresponding matching
478 // condition set.
479 //
480 OFMatch match = new OFMatch();
481 match.setWildcards(OFMatch.OFPFW_ALL);
482
483 // Match the Incoming Port
484 Short matchInPort = flowEntryObj.getMatchInPort();
485 if (matchInPort != null) {
486 match.setInputPort(matchInPort);
487 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
488 }
489
490 // Match the Source MAC address
491 String matchSrcMac = flowEntryObj.getMatchSrcMac();
492 if (matchSrcMac == null)
493 matchSrcMac = flowObj.getMatchSrcMac();
494 if (matchSrcMac != null) {
495 match.setDataLayerSource(matchSrcMac);
496 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
497 }
498
499 // Match the Destination MAC address
500 String matchDstMac = flowEntryObj.getMatchDstMac();
501 if (matchDstMac == null)
502 matchDstMac = flowObj.getMatchDstMac();
503 if (matchDstMac != null) {
504 match.setDataLayerDestination(matchDstMac);
505 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
506 }
507
508 // Match the Ethernet Frame Type
509 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
510 if (matchEthernetFrameType == null)
511 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
512 if (matchEthernetFrameType != null) {
513 match.setDataLayerType(matchEthernetFrameType);
514 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
515 }
516
517 // Match the VLAN ID
518 Short matchVlanId = flowEntryObj.getMatchVlanId();
519 if (matchVlanId == null)
520 matchVlanId = flowObj.getMatchVlanId();
521 if (matchVlanId != null) {
522 match.setDataLayerVirtualLan(matchVlanId);
523 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
524 }
525
526 // Match the VLAN priority
527 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
528 if (matchVlanPriority == null)
529 matchVlanPriority = flowObj.getMatchVlanPriority();
530 if (matchVlanPriority != null) {
531 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
532 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
533 }
534
535 // Match the Source IPv4 Network prefix
536 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
537 if (matchSrcIPv4Net == null)
538 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
539 if (matchSrcIPv4Net != null) {
540 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
541 }
542
543 // Match the Destination IPv4 Network prefix
544 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
545 if (matchDstIPv4Net == null)
546 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
547 if (matchDstIPv4Net != null) {
548 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
549 }
550
551 // Match the IP protocol
552 Byte matchIpProto = flowEntryObj.getMatchIpProto();
553 if (matchIpProto == null)
554 matchIpProto = flowObj.getMatchIpProto();
555 if (matchIpProto != null) {
556 match.setNetworkProtocol(matchIpProto);
557 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
558 }
559
560 // Match the IP ToS (DSCP field, 6 bits)
561 Byte matchIpToS = flowEntryObj.getMatchIpToS();
562 if (matchIpToS == null)
563 matchIpToS = flowObj.getMatchIpToS();
564 if (matchIpToS != null) {
565 match.setNetworkTypeOfService(matchIpToS);
566 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
567 }
568
569 // Match the Source TCP/UDP port
570 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
571 if (matchSrcTcpUdpPort == null)
572 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
573 if (matchSrcTcpUdpPort != null) {
574 match.setTransportSource(matchSrcTcpUdpPort);
575 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
576 }
577
578 // Match the Destination TCP/UDP port
579 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
580 if (matchDstTcpUdpPort == null)
581 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
582 if (matchDstTcpUdpPort != null) {
583 match.setTransportDestination(matchDstTcpUdpPort);
584 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
585 }
586
587 //
588 // Fetch the actions
589 //
590 Short actionOutputPort = null;
591 List<OFAction> openFlowActions = new ArrayList<OFAction>();
592 int actionsLen = 0;
593 FlowEntryActions flowEntryActions = null;
594 String actionsStr = flowEntryObj.getActions();
595 if (actionsStr != null)
596 flowEntryActions = new FlowEntryActions(actionsStr);
597 else
598 flowEntryActions = new FlowEntryActions();
599 for (FlowEntryAction action : flowEntryActions.actions()) {
600 ActionOutput actionOutput = action.actionOutput();
601 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
602 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
603 ActionStripVlan actionStripVlan = action.actionStripVlan();
604 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
605 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
606 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
607 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
608 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
609 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
610 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
611 ActionEnqueue actionEnqueue = action.actionEnqueue();
612
613 if (actionOutput != null) {
614 actionOutputPort = actionOutput.port().value();
615 // XXX: The max length is hard-coded for now
616 OFActionOutput ofa =
617 new OFActionOutput(actionOutput.port().value(),
618 (short)0xffff);
619 openFlowActions.add(ofa);
620 actionsLen += ofa.getLength();
621 }
622
623 if (actionSetVlanId != null) {
624 OFActionVirtualLanIdentifier ofa =
625 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
626 openFlowActions.add(ofa);
627 actionsLen += ofa.getLength();
628 }
629
630 if (actionSetVlanPriority != null) {
631 OFActionVirtualLanPriorityCodePoint ofa =
632 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
633 openFlowActions.add(ofa);
634 actionsLen += ofa.getLength();
635 }
636
637 if (actionStripVlan != null) {
638 if (actionStripVlan.stripVlan() == true) {
639 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
640 openFlowActions.add(ofa);
641 actionsLen += ofa.getLength();
642 }
643 }
644
645 if (actionSetEthernetSrcAddr != null) {
646 OFActionDataLayerSource ofa =
647 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
648 openFlowActions.add(ofa);
649 actionsLen += ofa.getLength();
650 }
651
652 if (actionSetEthernetDstAddr != null) {
653 OFActionDataLayerDestination ofa =
654 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
655 openFlowActions.add(ofa);
656 actionsLen += ofa.getLength();
657 }
658
659 if (actionSetIPv4SrcAddr != null) {
660 OFActionNetworkLayerSource ofa =
661 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
662 openFlowActions.add(ofa);
663 actionsLen += ofa.getLength();
664 }
665
666 if (actionSetIPv4DstAddr != null) {
667 OFActionNetworkLayerDestination ofa =
668 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
669 openFlowActions.add(ofa);
670 actionsLen += ofa.getLength();
671 }
672
673 if (actionSetIpToS != null) {
674 OFActionNetworkTypeOfService ofa =
675 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
676 openFlowActions.add(ofa);
677 actionsLen += ofa.getLength();
678 }
679
680 if (actionSetTcpUdpSrcPort != null) {
681 OFActionTransportLayerSource ofa =
682 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
683 openFlowActions.add(ofa);
684 actionsLen += ofa.getLength();
685 }
686
687 if (actionSetTcpUdpDstPort != null) {
688 OFActionTransportLayerDestination ofa =
689 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
690 openFlowActions.add(ofa);
691 actionsLen += ofa.getLength();
692 }
693
694 if (actionEnqueue != null) {
695 OFActionEnqueue ofa =
696 new OFActionEnqueue(actionEnqueue.port().value(),
697 actionEnqueue.queueId());
698 openFlowActions.add(ofa);
699 actionsLen += ofa.getLength();
700 }
701 }
702
703 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
704 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
705 .setPriority(PRIORITY_DEFAULT)
706 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
707 .setCookie(cookie)
708 .setCommand(flowModCommand)
709 .setMatch(match)
710 .setActions(openFlowActions)
711 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
712 fm.setOutPort(OFPort.OFPP_NONE.getValue());
713 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
714 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
715 if (actionOutputPort != null)
716 fm.setOutPort(actionOutputPort);
717 }
718
719 //
720 // TODO: Set the following flag
721 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
722 // See method ForwardingBase::pushRoute()
723 //
724
725 //
726 // Write the message to the switch
727 //
728 log.debug("MEASUREMENT: Installing flow entry " + userState +
729 " into switch DPID: " +
730 sw.getStringId() +
731 " flowEntryId: " + flowEntryId.toString() +
732 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
733 " inPort: " + matchInPort + " outPort: " + actionOutputPort
734 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800735 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800736 //
737 // TODO: We should use the OpenFlow Barrier mechanism
738 // to check for errors, and update the SwitchState
739 // for a flow entry after the Barrier message is
740 // is received.
741 //
742 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
743
744 return true;
745 }
746
Brian O'Connor8c166a72013-11-14 18:41:48 -0800747 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800748 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
749 //
750 // Create the OpenFlow Flow Modification Entry to push
751 //
752 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
753 long cookie = flowEntry.flowEntryId().value();
754
755 short flowModCommand = OFFlowMod.OFPFC_ADD;
756 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
757 flowModCommand = OFFlowMod.OFPFC_ADD;
758 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
759 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
760 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
761 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
762 } else {
763 // Unknown user state. Ignore the entry
764 log.debug(
765 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
766 flowEntry.flowEntryId().toString(),
767 flowEntry.flowEntryUserState());
768 return false;
769 }
770
771 //
772 // Fetch the match conditions.
773 //
774 // NOTE: The Flow matching conditions common for all Flow Entries are
775 // used ONLY if a Flow Entry does NOT have the corresponding matching
776 // condition set.
777 //
778 OFMatch match = new OFMatch();
779 match.setWildcards(OFMatch.OFPFW_ALL);
780 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
781 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
782
783 // Match the Incoming Port
784 Port matchInPort = flowEntryMatch.inPort();
785 if (matchInPort != null) {
786 match.setInputPort(matchInPort.value());
787 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
788 }
789
790 // Match the Source MAC address
791 MACAddress matchSrcMac = flowEntryMatch.srcMac();
792 if ((matchSrcMac == null) && (flowPathMatch != null)) {
793 matchSrcMac = flowPathMatch.srcMac();
794 }
795 if (matchSrcMac != null) {
796 match.setDataLayerSource(matchSrcMac.toString());
797 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
798 }
799
800 // Match the Destination MAC address
801 MACAddress matchDstMac = flowEntryMatch.dstMac();
802 if ((matchDstMac == null) && (flowPathMatch != null)) {
803 matchDstMac = flowPathMatch.dstMac();
804 }
805 if (matchDstMac != null) {
806 match.setDataLayerDestination(matchDstMac.toString());
807 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
808 }
809
810 // Match the Ethernet Frame Type
811 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
812 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
813 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
814 }
815 if (matchEthernetFrameType != null) {
816 match.setDataLayerType(matchEthernetFrameType);
817 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
818 }
819
820 // Match the VLAN ID
821 Short matchVlanId = flowEntryMatch.vlanId();
822 if ((matchVlanId == null) && (flowPathMatch != null)) {
823 matchVlanId = flowPathMatch.vlanId();
824 }
825 if (matchVlanId != null) {
826 match.setDataLayerVirtualLan(matchVlanId);
827 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
828 }
829
830 // Match the VLAN priority
831 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
832 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
833 matchVlanPriority = flowPathMatch.vlanPriority();
834 }
835 if (matchVlanPriority != null) {
836 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
837 match.setWildcards(match.getWildcards()
838 & ~OFMatch.OFPFW_DL_VLAN_PCP);
839 }
840
841 // Match the Source IPv4 Network prefix
842 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
843 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
844 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
845 }
846 if (matchSrcIPv4Net != null) {
847 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
848 }
849
850 // Natch the Destination IPv4 Network prefix
851 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
852 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
853 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
854 }
855 if (matchDstIPv4Net != null) {
856 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
857 }
858
859 // Match the IP protocol
860 Byte matchIpProto = flowEntryMatch.ipProto();
861 if ((matchIpProto == null) && (flowPathMatch != null)) {
862 matchIpProto = flowPathMatch.ipProto();
863 }
864 if (matchIpProto != null) {
865 match.setNetworkProtocol(matchIpProto);
866 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
867 }
868
869 // Match the IP ToS (DSCP field, 6 bits)
870 Byte matchIpToS = flowEntryMatch.ipToS();
871 if ((matchIpToS == null) && (flowPathMatch != null)) {
872 matchIpToS = flowPathMatch.ipToS();
873 }
874 if (matchIpToS != null) {
875 match.setNetworkTypeOfService(matchIpToS);
876 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
877 }
878
879 // Match the Source TCP/UDP port
880 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
881 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
882 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
883 }
884 if (matchSrcTcpUdpPort != null) {
885 match.setTransportSource(matchSrcTcpUdpPort);
886 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
887 }
888
889 // Match the Destination TCP/UDP port
890 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
891 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
892 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
893 }
894 if (matchDstTcpUdpPort != null) {
895 match.setTransportDestination(matchDstTcpUdpPort);
896 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
897 }
898
899 //
900 // Fetch the actions
901 //
902 Short actionOutputPort = null;
903 List<OFAction> openFlowActions = new ArrayList<OFAction>();
904 int actionsLen = 0;
905 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
906 //
907 for (FlowEntryAction action : flowEntryActions.actions()) {
908 ActionOutput actionOutput = action.actionOutput();
909 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
910 ActionSetVlanPriority actionSetVlanPriority = action
911 .actionSetVlanPriority();
912 ActionStripVlan actionStripVlan = action.actionStripVlan();
913 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
914 .actionSetEthernetSrcAddr();
915 ActionSetEthernetAddr actionSetEthernetDstAddr = action
916 .actionSetEthernetDstAddr();
917 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
918 .actionSetIPv4SrcAddr();
919 ActionSetIPv4Addr actionSetIPv4DstAddr = action
920 .actionSetIPv4DstAddr();
921 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
922 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
923 .actionSetTcpUdpSrcPort();
924 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
925 .actionSetTcpUdpDstPort();
926 ActionEnqueue actionEnqueue = action.actionEnqueue();
927
928 if (actionOutput != null) {
929 actionOutputPort = actionOutput.port().value();
930 // XXX: The max length is hard-coded for now
931 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
932 .value(), (short) 0xffff);
933 openFlowActions.add(ofa);
934 actionsLen += ofa.getLength();
935 }
936
937 if (actionSetVlanId != null) {
938 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
939 actionSetVlanId.vlanId());
940 openFlowActions.add(ofa);
941 actionsLen += ofa.getLength();
942 }
943
944 if (actionSetVlanPriority != null) {
945 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
946 actionSetVlanPriority.vlanPriority());
947 openFlowActions.add(ofa);
948 actionsLen += ofa.getLength();
949 }
950
951 if (actionStripVlan != null) {
952 if (actionStripVlan.stripVlan() == true) {
953 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
954 openFlowActions.add(ofa);
955 actionsLen += ofa.getLength();
956 }
957 }
958
959 if (actionSetEthernetSrcAddr != null) {
960 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
961 actionSetEthernetSrcAddr.addr().toBytes());
962 openFlowActions.add(ofa);
963 actionsLen += ofa.getLength();
964 }
965
966 if (actionSetEthernetDstAddr != null) {
967 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
968 actionSetEthernetDstAddr.addr().toBytes());
969 openFlowActions.add(ofa);
970 actionsLen += ofa.getLength();
971 }
972
973 if (actionSetIPv4SrcAddr != null) {
974 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
975 actionSetIPv4SrcAddr.addr().value());
976 openFlowActions.add(ofa);
977 actionsLen += ofa.getLength();
978 }
979
980 if (actionSetIPv4DstAddr != null) {
981 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
982 actionSetIPv4DstAddr.addr().value());
983 openFlowActions.add(ofa);
984 actionsLen += ofa.getLength();
985 }
986
987 if (actionSetIpToS != null) {
988 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
989 actionSetIpToS.ipToS());
990 openFlowActions.add(ofa);
991 actionsLen += ofa.getLength();
992 }
993
994 if (actionSetTcpUdpSrcPort != null) {
995 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
996 actionSetTcpUdpSrcPort.port());
997 openFlowActions.add(ofa);
998 actionsLen += ofa.getLength();
999 }
1000
1001 if (actionSetTcpUdpDstPort != null) {
1002 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
1003 actionSetTcpUdpDstPort.port());
1004 openFlowActions.add(ofa);
1005 actionsLen += ofa.getLength();
1006 }
1007
1008 if (actionEnqueue != null) {
1009 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
1010 .value(), actionEnqueue.queueId());
1011 openFlowActions.add(ofa);
1012 actionsLen += ofa.getLength();
1013 }
1014 }
1015
1016 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
1017 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
1018 .setPriority(PRIORITY_DEFAULT)
1019 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
1020 .setCommand(flowModCommand).setMatch(match)
1021 .setActions(openFlowActions)
1022 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
1023 fm.setOutPort(OFPort.OFPP_NONE.getValue());
1024 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
1025 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
1026 if (actionOutputPort != null)
1027 fm.setOutPort(actionOutputPort);
1028 }
1029
1030 //
1031 // TODO: Set the following flag
1032 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
1033 // See method ForwardingBase::pushRoute()
1034 //
1035
1036 //
1037 // Write the message to the switch
1038 //
1039 log.debug("MEASUREMENT: Installing flow entry "
1040 + flowEntry.flowEntryUserState() + " into switch DPID: "
1041 + sw.getStringId() + " flowEntryId: "
1042 + flowEntry.flowEntryId().toString() + " srcMac: "
1043 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
1044 + matchInPort + " outPort: " + actionOutputPort);
1045
1046 //
1047 // TODO: We should use the OpenFlow Barrier mechanism
1048 // to check for errors, and update the SwitchState
1049 // for a flow entry after the Barrier message is
1050 // is received.
1051 //
1052 // TODO: The FlowEntry Object in Titan should be set
1053 // to FE_SWITCH_UPDATED.
1054 //
1055
1056 return add(sw,fm);
1057 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001058
1059 @Override
1060 public OFBarrierReply barrier(IOFSwitch sw) {
1061 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
1062 if (future == null) {
1063 return null;
1064 }
1065
1066 try {
1067 return future.get();
1068 } catch (InterruptedException e) {
1069 e.printStackTrace();
1070 log.error("InterruptedException: {}", e);
1071 return null;
1072 } catch (ExecutionException e) {
1073 e.printStackTrace();
1074 log.error("ExecutionException: {}", e);
1075 return null;
1076 }
1077 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -08001078
Naoki Shiotac1601d32013-11-20 10:47:34 -08001079 @Override
1080 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1081 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -08001082
1083 if (sw == null) {
1084 return null;
1085 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001086
Naoki Shiota81dbe302013-11-21 15:35:38 -08001087 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001088 msg.setXid(sw.getNextTransactionId());
1089 add(sw, msg);
1090
1091 // TODO create Future object of message
1092 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiota81dbe302013-11-21 15:35:38 -08001093
Naoki Shiotac1601d32013-11-20 10:47:34 -08001094 synchronized (barrierFutures) {
1095 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1096 if (map == null) {
1097 map = new HashMap<Integer,OFBarrierReplyFuture>();
1098 barrierFutures.put(sw.getId(), map);
1099 }
1100 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001101 }
1102
1103 return future;
1104 }
1105
Naoki Shiotae3199732013-11-25 16:14:43 -08001106 /**
1107 * Get a queue attached to a switch.
1108 * @param sw Switch object
1109 * @return Queue object
1110 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001111 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001112 if (sw == null) {
1113 return null;
1114 }
1115
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001116 return getProcess(sw).queues.get(sw);
1117 }
1118
Naoki Shiotae3199732013-11-25 16:14:43 -08001119 /**
1120 * Get a hash value correspondent to a switch.
1121 * @param sw Switch object
1122 * @return Hash value
1123 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001124 protected long getHash(IOFSwitch sw) {
1125 // This code assumes DPID is sequentially assigned.
1126 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001127 return sw.getId() % number_thread;
1128 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001129
1130 /**
1131 * Get a Thread object which processes the queue attached to a switch.
1132 * @param sw Switch object
1133 * @return Thread object
1134 */
Naoki Shiota81dbe302013-11-21 15:35:38 -08001135 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001136 long hash = getHash(sw);
1137
1138 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001139 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001140
1141 @Override
1142 public String getName() {
1143 return "flowpusher";
1144 }
1145
1146 @Override
1147 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1148 return false;
1149 }
1150
1151 @Override
1152 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1153 return false;
1154 }
1155
1156 @Override
1157 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -08001158 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1159 if (map == null) {
1160 return Command.CONTINUE;
1161 }
1162
1163 OFBarrierReplyFuture future = map.get(msg.getXid());
1164 if (future == null) {
1165 return Command.CONTINUE;
1166 }
1167
1168 log.debug("Received BARRIER_REPLY : {}", msg);
1169 future.deliverFuture(sw, msg);
1170
1171 return Command.CONTINUE;
1172 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001173
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001174}