blob: 8f2469bb72f3e94d4d5a946f3dc4e2e4c3bd17de [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08008import java.util.HashSet;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08009import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070010import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080011import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080012import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080013import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070014
Naoki Shiota7d0cf272013-11-05 10:18:12 -080015import org.openflow.protocol.*;
16import org.openflow.protocol.action.*;
17import org.openflow.protocol.factory.BasicFactory;
18import org.slf4j.Logger;
19import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
21import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080022import net.floodlightcontroller.core.IFloodlightProviderService;
23import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070024import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080025import net.floodlightcontroller.core.internal.OFMessageFuture;
26import net.floodlightcontroller.core.module.FloodlightModuleContext;
27import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080028import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080029import net.floodlightcontroller.util.OFMessageDamper;
30import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
31import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
32import net.onrc.onos.ofcontroller.util.FlowEntryAction;
33import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080034import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080035import net.onrc.onos.ofcontroller.util.FlowEntryActions;
36import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080037import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
38import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
39import net.onrc.onos.ofcontroller.util.FlowPath;
40import net.onrc.onos.ofcontroller.util.IPv4Net;
41import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070042
43/**
Naoki Shiotab485d412013-11-26 12:04:19 -080044 * FlowPusher is a implementation of FlowPusherService.
45 * FlowPusher assigns one message queue instance for each one switch.
46 * Number of message processing threads is configurable by constructor, and
47 * one thread can handle multiple message queues. Each queue will be assigned to
48 * a thread according to hash function defined by getHash().
49 * Each processing thread reads messages from queues and sends it to switches
50 * in round-robin. Processing thread also calculates rate of sending to suppress
51 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070052 * @author Naoki Shiota
53 *
54 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080055public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080056 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
57
58 // NOTE: Below are moved from FlowManager.
59 // TODO: Values copied from elsewhere (class LearningSwitch).
60 // The local copy should go away!
61 //
62 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
63 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080064
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080065 // Number of messages sent to switch at once
66 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080067
68 public static final short PRIORITY_DEFAULT = 100;
69 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
70 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
71
72 public enum QueueState {
73 READY,
74 SUSPENDED,
75 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070076
Naoki Shiotac1601d32013-11-20 10:47:34 -080077 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080078 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080079 * This consists of queue itself and variables used for limiting sending rate.
80 * @author Naoki Shiota
81 *
82 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080083 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080084 private class SwitchQueue extends ArrayDeque<OFMessage> {
85 QueueState state;
86
Naoki Shiotae3199732013-11-25 16:14:43 -080087 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080088 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070089 long last_sent_time = 0;
90 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080091
Naoki Shiotae3199732013-11-25 16:14:43 -080092 // "To be deleted" flag
93 boolean toBeDeleted = false;
94
Naoki Shiota7d0cf272013-11-05 10:18:12 -080095 /**
96 * Check if sending rate is within the rate
97 * @param current Current time
98 * @return true if within the rate
99 */
100 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800101 if (max_rate == 0) {
102 // no limitation
103 return true;
104 }
105
Naoki Shiota81dbe302013-11-21 15:35:38 -0800106 if (current == last_sent_time) {
107 return false;
108 }
109
Naoki Shiotac1601d32013-11-20 10:47:34 -0800110 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800111 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800112 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800113 }
114
Naoki Shiota81dbe302013-11-21 15:35:38 -0800115 /**
116 * Log time and size of last sent data.
117 * @param current Time to be sent.
118 * @param size Size of sent data (in bytes).
119 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800120 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800121 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800122 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800123 }
124
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700125 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800126
Naoki Shiotac1601d32013-11-20 10:47:34 -0800127 private OFMessageDamper messageDamper = null;
128 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700129
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800130 private FloodlightContext context = null;
131 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800132
133 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800134 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800135 // Map of Future objects versus dpid and transaction ID.
Naoki Shiotac1601d32013-11-20 10:47:34 -0800136 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
137 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800138
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800139 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800140
Naoki Shiota8739faa2013-11-18 17:00:25 -0800141 /**
142 * Main thread that reads messages from queues and sends them to switches.
143 * @author Naoki Shiota
144 *
145 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800146 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800147 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotab485d412013-11-26 12:04:19 -0800148 = new HashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800149
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800150 // reusable latch used for waiting for arrival of message
Naoki Shiota81dbe302013-11-21 15:35:38 -0800151 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800152
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700153 @Override
154 public void run() {
155 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800156 try {
157 // wait for message pushed to queue
158 mutex.acquire();
159 } catch (InterruptedException e) {
160 e.printStackTrace();
161 log.debug("FlowPusherThread is interrupted");
162 return;
163 }
164
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800165 // for safety of concurrent access, copy all key objects
166 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800167 synchronized (queues) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800168 for (IOFSwitch sw : queues.keySet()) {
169 keys.add(sw);
170 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800171 }
172
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800173 for (IOFSwitch sw : keys) {
174 SwitchQueue queue = queues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800175
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700176 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800177 if (sw == null || queue == null ||
178 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700179 continue;
180 }
181
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800182 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800183 processQueue(sw, queue, MAX_MESSAGE_SEND);
184 if (queue.isEmpty()) {
185 // remove queue if flagged to be.
186 if (queue.toBeDeleted) {
187 synchronized (queues) {
188 queues.remove(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800189 }
190 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800191 } else {
192 // if some messages remains in queue, latch down
193 if (mutex.availablePermits() == 0) {
194 mutex.release();
Naoki Shiota81dbe302013-11-21 15:35:38 -0800195 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700196 }
197 }
198 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700199 }
200 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800201
202 /**
203 * Read messages from queue and send them to the switch.
204 * If number of messages excess the limit, stop sending messages.
205 * @param sw Switch to which messages will be sent.
206 * @param queue Queue of messages.
207 * @param max_msg Limitation of number of messages to be sent. If set to 0,
208 * all messages in queue will be sent.
209 */
210 private void processQueue(IOFSwitch sw, SwitchQueue queue, long max_msg) {
211 // check sending rate and determine it to be sent or not
212 long current_time = System.currentTimeMillis();
213 long size = 0;
214
215 if (queue.isSendable(current_time)) {
216 int i = 0;
217 while (! queue.isEmpty()) {
218 // Number of messages excess the limit
219 if (0 < max_msg && max_msg <= i) {
220 break;
221 }
222 ++i;
223
224 OFMessage msg = queue.poll();
225 try {
226 messageDamper.write(sw, msg, context);
227 log.debug("Pusher sends message : {}", msg);
228 size += msg.getLength();
229 } catch (IOException e) {
230 e.printStackTrace();
231 log.error("Exception in sending message ({}) : {}", msg, e);
232 }
233 }
234 sw.flush();
235 queue.logSentData(current_time, size);
236 }
237 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700238 }
239
Naoki Shiotac1601d32013-11-20 10:47:34 -0800240 /**
241 * Initialize object with one thread.
242 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800243 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800244 }
245
Naoki Shiotac1601d32013-11-20 10:47:34 -0800246 /**
247 * Initialize object with threads of given number.
248 * @param number_thread Number of threads to handle messages.
249 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800250 public FlowPusher(int number_thread) {
251 this.number_thread = number_thread;
252 }
253
Naoki Shiotac1601d32013-11-20 10:47:34 -0800254 /**
255 * Set parameters needed for sending messages.
256 * @param context FloodlightContext used for sending messages.
257 * If null, FlowPusher uses default context.
258 * @param modContext FloodlightModuleContext used for acquiring
259 * ThreadPoolService and registering MessageListener.
260 * @param factory Factory object to create OFMessage objects.
261 * @param damper Message damper used for sending messages.
262 * If null, FlowPusher creates its own damper object.
263 */
264 public void init(FloodlightContext context,
265 FloodlightModuleContext modContext,
266 BasicFactory factory,
267 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700268 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800269 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800270 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
271 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
272 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800273
274 if (damper != null) {
275 messageDamper = damper;
276 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800277 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800278 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
279 EnumSet.of(OFType.FLOW_MOD),
280 OFMESSAGE_DAMPER_TIMEOUT);
281 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700282 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800283
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800284 /**
285 * Begin processing queue.
286 */
287 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800288 if (factory == null) {
289 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800290 return;
291 }
292
Naoki Shiota81dbe302013-11-21 15:35:38 -0800293 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800294 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800295 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800296
Naoki Shiota81dbe302013-11-21 15:35:38 -0800297 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800298 thread.start();
299 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700300 }
301
Brian O'Connor8c166a72013-11-14 18:41:48 -0800302 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800303 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800304 SwitchQueue queue = getQueue(sw);
305
306 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800307 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800308 }
309
310 synchronized (queue) {
311 if (queue.state == QueueState.READY) {
312 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800313 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800314 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800315 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800316 }
317 }
318
Brian O'Connor8c166a72013-11-14 18:41:48 -0800319 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800320 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800321 SwitchQueue queue = getQueue(sw);
322
323 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800324 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800325 }
326
327 synchronized (queue) {
328 if (queue.state == QueueState.SUSPENDED) {
329 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800330 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800331 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800332 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800333 }
334 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800335
Brian O'Connor8c166a72013-11-14 18:41:48 -0800336 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800337 public boolean isSuspended(IOFSwitch sw) {
338 SwitchQueue queue = getQueue(sw);
339
340 if (queue == null) {
341 // TODO Is true suitable for this case?
342 return true;
343 }
344
345 return (queue.state == QueueState.SUSPENDED);
346 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800347
348 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800349 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800350 */
351 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800352 if (threadMap == null) {
353 return;
354 }
355
Naoki Shiota81dbe302013-11-21 15:35:38 -0800356 for (FlowPusherThread t : threadMap.values()) {
357 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800358 }
359 }
360
Naoki Shiotae3199732013-11-25 16:14:43 -0800361 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800362 public void setRate(IOFSwitch sw, long rate) {
363 SwitchQueue queue = getQueue(sw);
364 if (queue == null) {
365 return;
366 }
367
368 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800369 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800370 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700371 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700372 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800373
374 @Override
375 public boolean createQueue(IOFSwitch sw) {
376 SwitchQueue queue = getQueue(sw);
377 if (queue != null) {
378 return false;
379 }
380
381 FlowPusherThread proc = getProcess(sw);
382 queue = new SwitchQueue();
383 queue.state = QueueState.READY;
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800384 synchronized (proc.queues) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800385 proc.queues.put(sw, queue);
386 }
387
388 return true;
389 }
390
391 @Override
392 public boolean deleteQueue(IOFSwitch sw) {
393 return deleteQueue(sw, false);
394 }
395
396 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800397 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800398 FlowPusherThread proc = getProcess(sw);
399
Naoki Shiotab485d412013-11-26 12:04:19 -0800400 if (forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800401 synchronized (proc.queues) {
402 SwitchQueue queue = proc.queues.remove(sw);
403 if (queue == null) {
404 return false;
405 }
406 }
407 return true;
408 } else {
409 SwitchQueue queue = getQueue(sw);
410 if (queue == null) {
411 return false;
412 }
413 synchronized (queue) {
414 queue.toBeDeleted = true;
415 }
416 return true;
417 }
418 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700419
Brian O'Connor8c166a72013-11-14 18:41:48 -0800420 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800421 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800422 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800423 SwitchQueue queue = proc.queues.get(sw);
424
Naoki Shiotab485d412013-11-26 12:04:19 -0800425 // create queue at first addition of message
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800426 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800427 createQueue(sw);
428 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800429 }
430
431 synchronized (queue) {
432 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800433 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800434 }
435
Naoki Shiota81dbe302013-11-21 15:35:38 -0800436 if (proc.mutex.availablePermits() == 0) {
437 proc.mutex.release();
438 }
439
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800440 return true;
441 }
442
Brian O'Connor8c166a72013-11-14 18:41:48 -0800443 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800444 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800445 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800446 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
447 if (flowEntryIdStr == null)
448 return false;
449 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
450 String userState = flowEntryObj.getUserState();
451 if (userState == null)
452 return false;
453
454 //
455 // Create the Open Flow Flow Modification Entry to push
456 //
457 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
458 long cookie = flowEntryId.value();
459
460 short flowModCommand = OFFlowMod.OFPFC_ADD;
461 if (userState.equals("FE_USER_ADD")) {
462 flowModCommand = OFFlowMod.OFPFC_ADD;
463 } else if (userState.equals("FE_USER_MODIFY")) {
464 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
465 } else if (userState.equals("FE_USER_DELETE")) {
466 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
467 } else {
468 // Unknown user state. Ignore the entry
469 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
470 flowEntryId.toString(), userState);
471 return false;
472 }
473
474 //
475 // Fetch the match conditions.
476 //
477 // NOTE: The Flow matching conditions common for all Flow Entries are
478 // used ONLY if a Flow Entry does NOT have the corresponding matching
479 // condition set.
480 //
481 OFMatch match = new OFMatch();
482 match.setWildcards(OFMatch.OFPFW_ALL);
483
484 // Match the Incoming Port
485 Short matchInPort = flowEntryObj.getMatchInPort();
486 if (matchInPort != null) {
487 match.setInputPort(matchInPort);
488 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
489 }
490
491 // Match the Source MAC address
492 String matchSrcMac = flowEntryObj.getMatchSrcMac();
493 if (matchSrcMac == null)
494 matchSrcMac = flowObj.getMatchSrcMac();
495 if (matchSrcMac != null) {
496 match.setDataLayerSource(matchSrcMac);
497 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
498 }
499
500 // Match the Destination MAC address
501 String matchDstMac = flowEntryObj.getMatchDstMac();
502 if (matchDstMac == null)
503 matchDstMac = flowObj.getMatchDstMac();
504 if (matchDstMac != null) {
505 match.setDataLayerDestination(matchDstMac);
506 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
507 }
508
509 // Match the Ethernet Frame Type
510 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
511 if (matchEthernetFrameType == null)
512 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
513 if (matchEthernetFrameType != null) {
514 match.setDataLayerType(matchEthernetFrameType);
515 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
516 }
517
518 // Match the VLAN ID
519 Short matchVlanId = flowEntryObj.getMatchVlanId();
520 if (matchVlanId == null)
521 matchVlanId = flowObj.getMatchVlanId();
522 if (matchVlanId != null) {
523 match.setDataLayerVirtualLan(matchVlanId);
524 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
525 }
526
527 // Match the VLAN priority
528 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
529 if (matchVlanPriority == null)
530 matchVlanPriority = flowObj.getMatchVlanPriority();
531 if (matchVlanPriority != null) {
532 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
533 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
534 }
535
536 // Match the Source IPv4 Network prefix
537 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
538 if (matchSrcIPv4Net == null)
539 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
540 if (matchSrcIPv4Net != null) {
541 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
542 }
543
544 // Match the Destination IPv4 Network prefix
545 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
546 if (matchDstIPv4Net == null)
547 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
548 if (matchDstIPv4Net != null) {
549 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
550 }
551
552 // Match the IP protocol
553 Byte matchIpProto = flowEntryObj.getMatchIpProto();
554 if (matchIpProto == null)
555 matchIpProto = flowObj.getMatchIpProto();
556 if (matchIpProto != null) {
557 match.setNetworkProtocol(matchIpProto);
558 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
559 }
560
561 // Match the IP ToS (DSCP field, 6 bits)
562 Byte matchIpToS = flowEntryObj.getMatchIpToS();
563 if (matchIpToS == null)
564 matchIpToS = flowObj.getMatchIpToS();
565 if (matchIpToS != null) {
566 match.setNetworkTypeOfService(matchIpToS);
567 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
568 }
569
570 // Match the Source TCP/UDP port
571 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
572 if (matchSrcTcpUdpPort == null)
573 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
574 if (matchSrcTcpUdpPort != null) {
575 match.setTransportSource(matchSrcTcpUdpPort);
576 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
577 }
578
579 // Match the Destination TCP/UDP port
580 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
581 if (matchDstTcpUdpPort == null)
582 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
583 if (matchDstTcpUdpPort != null) {
584 match.setTransportDestination(matchDstTcpUdpPort);
585 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
586 }
587
588 //
589 // Fetch the actions
590 //
591 Short actionOutputPort = null;
592 List<OFAction> openFlowActions = new ArrayList<OFAction>();
593 int actionsLen = 0;
594 FlowEntryActions flowEntryActions = null;
595 String actionsStr = flowEntryObj.getActions();
596 if (actionsStr != null)
597 flowEntryActions = new FlowEntryActions(actionsStr);
598 else
599 flowEntryActions = new FlowEntryActions();
600 for (FlowEntryAction action : flowEntryActions.actions()) {
601 ActionOutput actionOutput = action.actionOutput();
602 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
603 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
604 ActionStripVlan actionStripVlan = action.actionStripVlan();
605 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
606 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
607 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
608 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
609 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
610 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
611 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
612 ActionEnqueue actionEnqueue = action.actionEnqueue();
613
614 if (actionOutput != null) {
615 actionOutputPort = actionOutput.port().value();
616 // XXX: The max length is hard-coded for now
617 OFActionOutput ofa =
618 new OFActionOutput(actionOutput.port().value(),
619 (short)0xffff);
620 openFlowActions.add(ofa);
621 actionsLen += ofa.getLength();
622 }
623
624 if (actionSetVlanId != null) {
625 OFActionVirtualLanIdentifier ofa =
626 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
627 openFlowActions.add(ofa);
628 actionsLen += ofa.getLength();
629 }
630
631 if (actionSetVlanPriority != null) {
632 OFActionVirtualLanPriorityCodePoint ofa =
633 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
634 openFlowActions.add(ofa);
635 actionsLen += ofa.getLength();
636 }
637
638 if (actionStripVlan != null) {
639 if (actionStripVlan.stripVlan() == true) {
640 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
641 openFlowActions.add(ofa);
642 actionsLen += ofa.getLength();
643 }
644 }
645
646 if (actionSetEthernetSrcAddr != null) {
647 OFActionDataLayerSource ofa =
648 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
649 openFlowActions.add(ofa);
650 actionsLen += ofa.getLength();
651 }
652
653 if (actionSetEthernetDstAddr != null) {
654 OFActionDataLayerDestination ofa =
655 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
656 openFlowActions.add(ofa);
657 actionsLen += ofa.getLength();
658 }
659
660 if (actionSetIPv4SrcAddr != null) {
661 OFActionNetworkLayerSource ofa =
662 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
663 openFlowActions.add(ofa);
664 actionsLen += ofa.getLength();
665 }
666
667 if (actionSetIPv4DstAddr != null) {
668 OFActionNetworkLayerDestination ofa =
669 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
670 openFlowActions.add(ofa);
671 actionsLen += ofa.getLength();
672 }
673
674 if (actionSetIpToS != null) {
675 OFActionNetworkTypeOfService ofa =
676 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
677 openFlowActions.add(ofa);
678 actionsLen += ofa.getLength();
679 }
680
681 if (actionSetTcpUdpSrcPort != null) {
682 OFActionTransportLayerSource ofa =
683 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
684 openFlowActions.add(ofa);
685 actionsLen += ofa.getLength();
686 }
687
688 if (actionSetTcpUdpDstPort != null) {
689 OFActionTransportLayerDestination ofa =
690 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
691 openFlowActions.add(ofa);
692 actionsLen += ofa.getLength();
693 }
694
695 if (actionEnqueue != null) {
696 OFActionEnqueue ofa =
697 new OFActionEnqueue(actionEnqueue.port().value(),
698 actionEnqueue.queueId());
699 openFlowActions.add(ofa);
700 actionsLen += ofa.getLength();
701 }
702 }
703
704 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
705 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
706 .setPriority(PRIORITY_DEFAULT)
707 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
708 .setCookie(cookie)
709 .setCommand(flowModCommand)
710 .setMatch(match)
711 .setActions(openFlowActions)
712 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
713 fm.setOutPort(OFPort.OFPP_NONE.getValue());
714 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
715 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
716 if (actionOutputPort != null)
717 fm.setOutPort(actionOutputPort);
718 }
719
720 //
721 // TODO: Set the following flag
722 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
723 // See method ForwardingBase::pushRoute()
724 //
725
726 //
727 // Write the message to the switch
728 //
729 log.debug("MEASUREMENT: Installing flow entry " + userState +
730 " into switch DPID: " +
731 sw.getStringId() +
732 " flowEntryId: " + flowEntryId.toString() +
733 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
734 " inPort: " + matchInPort + " outPort: " + actionOutputPort
735 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800736 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800737 //
738 // TODO: We should use the OpenFlow Barrier mechanism
739 // to check for errors, and update the SwitchState
740 // for a flow entry after the Barrier message is
741 // is received.
742 //
743 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
744
745 return true;
746 }
747
Brian O'Connor8c166a72013-11-14 18:41:48 -0800748 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800749 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
750 //
751 // Create the OpenFlow Flow Modification Entry to push
752 //
753 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
754 long cookie = flowEntry.flowEntryId().value();
755
756 short flowModCommand = OFFlowMod.OFPFC_ADD;
757 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
758 flowModCommand = OFFlowMod.OFPFC_ADD;
759 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
760 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
761 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
762 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
763 } else {
764 // Unknown user state. Ignore the entry
765 log.debug(
766 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
767 flowEntry.flowEntryId().toString(),
768 flowEntry.flowEntryUserState());
769 return false;
770 }
771
772 //
773 // Fetch the match conditions.
774 //
775 // NOTE: The Flow matching conditions common for all Flow Entries are
776 // used ONLY if a Flow Entry does NOT have the corresponding matching
777 // condition set.
778 //
779 OFMatch match = new OFMatch();
780 match.setWildcards(OFMatch.OFPFW_ALL);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800781 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
782
783 // Match the Incoming Port
784 Port matchInPort = flowEntryMatch.inPort();
785 if (matchInPort != null) {
786 match.setInputPort(matchInPort.value());
787 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
788 }
789
790 // Match the Source MAC address
791 MACAddress matchSrcMac = flowEntryMatch.srcMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800792 if (matchSrcMac != null) {
793 match.setDataLayerSource(matchSrcMac.toString());
794 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
795 }
796
797 // Match the Destination MAC address
798 MACAddress matchDstMac = flowEntryMatch.dstMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800799 if (matchDstMac != null) {
800 match.setDataLayerDestination(matchDstMac.toString());
801 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
802 }
803
804 // Match the Ethernet Frame Type
805 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800806 if (matchEthernetFrameType != null) {
807 match.setDataLayerType(matchEthernetFrameType);
808 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
809 }
810
811 // Match the VLAN ID
812 Short matchVlanId = flowEntryMatch.vlanId();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800813 if (matchVlanId != null) {
814 match.setDataLayerVirtualLan(matchVlanId);
815 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
816 }
817
818 // Match the VLAN priority
819 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800820 if (matchVlanPriority != null) {
821 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
822 match.setWildcards(match.getWildcards()
823 & ~OFMatch.OFPFW_DL_VLAN_PCP);
824 }
825
826 // Match the Source IPv4 Network prefix
827 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800828 if (matchSrcIPv4Net != null) {
829 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
830 }
831
832 // Natch the Destination IPv4 Network prefix
833 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800834 if (matchDstIPv4Net != null) {
835 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
836 }
837
838 // Match the IP protocol
839 Byte matchIpProto = flowEntryMatch.ipProto();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800840 if (matchIpProto != null) {
841 match.setNetworkProtocol(matchIpProto);
842 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
843 }
844
845 // Match the IP ToS (DSCP field, 6 bits)
846 Byte matchIpToS = flowEntryMatch.ipToS();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800847 if (matchIpToS != null) {
848 match.setNetworkTypeOfService(matchIpToS);
849 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
850 }
851
852 // Match the Source TCP/UDP port
853 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800854 if (matchSrcTcpUdpPort != null) {
855 match.setTransportSource(matchSrcTcpUdpPort);
856 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
857 }
858
859 // Match the Destination TCP/UDP port
860 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800861 if (matchDstTcpUdpPort != null) {
862 match.setTransportDestination(matchDstTcpUdpPort);
863 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
864 }
865
866 //
867 // Fetch the actions
868 //
869 Short actionOutputPort = null;
870 List<OFAction> openFlowActions = new ArrayList<OFAction>();
871 int actionsLen = 0;
872 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
873 //
874 for (FlowEntryAction action : flowEntryActions.actions()) {
875 ActionOutput actionOutput = action.actionOutput();
876 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
877 ActionSetVlanPriority actionSetVlanPriority = action
878 .actionSetVlanPriority();
879 ActionStripVlan actionStripVlan = action.actionStripVlan();
880 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
881 .actionSetEthernetSrcAddr();
882 ActionSetEthernetAddr actionSetEthernetDstAddr = action
883 .actionSetEthernetDstAddr();
884 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
885 .actionSetIPv4SrcAddr();
886 ActionSetIPv4Addr actionSetIPv4DstAddr = action
887 .actionSetIPv4DstAddr();
888 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
889 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
890 .actionSetTcpUdpSrcPort();
891 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
892 .actionSetTcpUdpDstPort();
893 ActionEnqueue actionEnqueue = action.actionEnqueue();
894
895 if (actionOutput != null) {
896 actionOutputPort = actionOutput.port().value();
897 // XXX: The max length is hard-coded for now
898 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
899 .value(), (short) 0xffff);
900 openFlowActions.add(ofa);
901 actionsLen += ofa.getLength();
902 }
903
904 if (actionSetVlanId != null) {
905 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
906 actionSetVlanId.vlanId());
907 openFlowActions.add(ofa);
908 actionsLen += ofa.getLength();
909 }
910
911 if (actionSetVlanPriority != null) {
912 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
913 actionSetVlanPriority.vlanPriority());
914 openFlowActions.add(ofa);
915 actionsLen += ofa.getLength();
916 }
917
918 if (actionStripVlan != null) {
919 if (actionStripVlan.stripVlan() == true) {
920 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
921 openFlowActions.add(ofa);
922 actionsLen += ofa.getLength();
923 }
924 }
925
926 if (actionSetEthernetSrcAddr != null) {
927 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
928 actionSetEthernetSrcAddr.addr().toBytes());
929 openFlowActions.add(ofa);
930 actionsLen += ofa.getLength();
931 }
932
933 if (actionSetEthernetDstAddr != null) {
934 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
935 actionSetEthernetDstAddr.addr().toBytes());
936 openFlowActions.add(ofa);
937 actionsLen += ofa.getLength();
938 }
939
940 if (actionSetIPv4SrcAddr != null) {
941 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
942 actionSetIPv4SrcAddr.addr().value());
943 openFlowActions.add(ofa);
944 actionsLen += ofa.getLength();
945 }
946
947 if (actionSetIPv4DstAddr != null) {
948 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
949 actionSetIPv4DstAddr.addr().value());
950 openFlowActions.add(ofa);
951 actionsLen += ofa.getLength();
952 }
953
954 if (actionSetIpToS != null) {
955 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
956 actionSetIpToS.ipToS());
957 openFlowActions.add(ofa);
958 actionsLen += ofa.getLength();
959 }
960
961 if (actionSetTcpUdpSrcPort != null) {
962 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
963 actionSetTcpUdpSrcPort.port());
964 openFlowActions.add(ofa);
965 actionsLen += ofa.getLength();
966 }
967
968 if (actionSetTcpUdpDstPort != null) {
969 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
970 actionSetTcpUdpDstPort.port());
971 openFlowActions.add(ofa);
972 actionsLen += ofa.getLength();
973 }
974
975 if (actionEnqueue != null) {
976 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
977 .value(), actionEnqueue.queueId());
978 openFlowActions.add(ofa);
979 actionsLen += ofa.getLength();
980 }
981 }
982
983 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
984 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
985 .setPriority(PRIORITY_DEFAULT)
986 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
987 .setCommand(flowModCommand).setMatch(match)
988 .setActions(openFlowActions)
989 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
990 fm.setOutPort(OFPort.OFPP_NONE.getValue());
991 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
992 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
993 if (actionOutputPort != null)
994 fm.setOutPort(actionOutputPort);
995 }
996
997 //
998 // TODO: Set the following flag
999 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
1000 // See method ForwardingBase::pushRoute()
1001 //
1002
1003 //
1004 // Write the message to the switch
1005 //
1006 log.debug("MEASUREMENT: Installing flow entry "
1007 + flowEntry.flowEntryUserState() + " into switch DPID: "
1008 + sw.getStringId() + " flowEntryId: "
1009 + flowEntry.flowEntryId().toString() + " srcMac: "
1010 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
1011 + matchInPort + " outPort: " + actionOutputPort);
1012
1013 //
1014 // TODO: We should use the OpenFlow Barrier mechanism
1015 // to check for errors, and update the SwitchState
1016 // for a flow entry after the Barrier message is
1017 // is received.
1018 //
1019 // TODO: The FlowEntry Object in Titan should be set
1020 // to FE_SWITCH_UPDATED.
1021 //
1022
1023 return add(sw,fm);
1024 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001025
1026 @Override
1027 public OFBarrierReply barrier(IOFSwitch sw) {
1028 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
1029 if (future == null) {
1030 return null;
1031 }
1032
1033 try {
1034 return future.get();
1035 } catch (InterruptedException e) {
1036 e.printStackTrace();
1037 log.error("InterruptedException: {}", e);
1038 return null;
1039 } catch (ExecutionException e) {
1040 e.printStackTrace();
1041 log.error("ExecutionException: {}", e);
1042 return null;
1043 }
1044 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -08001045
Naoki Shiotac1601d32013-11-20 10:47:34 -08001046 @Override
1047 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1048 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -08001049
1050 if (sw == null) {
1051 return null;
1052 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001053
Naoki Shiota81dbe302013-11-21 15:35:38 -08001054 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001055 msg.setXid(sw.getNextTransactionId());
Naoki Shiotac1601d32013-11-20 10:47:34 -08001056
Naoki Shiotac1601d32013-11-20 10:47:34 -08001057 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiotac1601d32013-11-20 10:47:34 -08001058 synchronized (barrierFutures) {
1059 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1060 if (map == null) {
1061 map = new HashMap<Integer,OFBarrierReplyFuture>();
1062 barrierFutures.put(sw.getId(), map);
1063 }
1064 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001065 }
1066
Naoki Shiotac2ec5592013-11-27 12:10:33 -08001067 add(sw, msg);
1068
Naoki Shiotac1601d32013-11-20 10:47:34 -08001069 return future;
1070 }
1071
Naoki Shiotae3199732013-11-25 16:14:43 -08001072 /**
1073 * Get a queue attached to a switch.
1074 * @param sw Switch object
1075 * @return Queue object
1076 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001077 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001078 if (sw == null) {
1079 return null;
1080 }
1081
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001082 return getProcess(sw).queues.get(sw);
1083 }
1084
Naoki Shiotae3199732013-11-25 16:14:43 -08001085 /**
1086 * Get a hash value correspondent to a switch.
1087 * @param sw Switch object
1088 * @return Hash value
1089 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001090 protected long getHash(IOFSwitch sw) {
1091 // This code assumes DPID is sequentially assigned.
1092 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001093 return sw.getId() % number_thread;
1094 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001095
1096 /**
1097 * Get a Thread object which processes the queue attached to a switch.
1098 * @param sw Switch object
1099 * @return Thread object
1100 */
Naoki Shiota81dbe302013-11-21 15:35:38 -08001101 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001102 long hash = getHash(sw);
1103
1104 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001105 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001106
1107 @Override
1108 public String getName() {
1109 return "flowpusher";
1110 }
1111
1112 @Override
1113 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1114 return false;
1115 }
1116
1117 @Override
1118 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1119 return false;
1120 }
1121
1122 @Override
1123 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -08001124 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1125 if (map == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -08001126 log.debug("null map for {} : {}", sw.getId(), barrierFutures);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001127 return Command.CONTINUE;
1128 }
1129
1130 OFBarrierReplyFuture future = map.get(msg.getXid());
1131 if (future == null) {
Naoki Shiotac2ec5592013-11-27 12:10:33 -08001132 log.debug("null future for {} : {}", msg.getXid(), map);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001133 return Command.CONTINUE;
1134 }
1135
1136 log.debug("Received BARRIER_REPLY : {}", msg);
1137 future.deliverFuture(sw, msg);
1138
1139 return Command.CONTINUE;
1140 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001141
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001142}