blob: 6c7ac65f8bbdec7a66a0a5e768634196ac272420 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08008import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080010import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080011import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080012import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070013
Naoki Shiota7d0cf272013-11-05 10:18:12 -080014import org.openflow.protocol.*;
15import org.openflow.protocol.action.*;
16import org.openflow.protocol.factory.BasicFactory;
17import org.slf4j.Logger;
18import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
20import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080027import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080028import net.floodlightcontroller.util.OFMessageDamper;
29import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
30import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
31import net.onrc.onos.ofcontroller.util.FlowEntryAction;
32import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080033import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080034import net.onrc.onos.ofcontroller.util.FlowEntryActions;
35import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080036import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
37import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
38import net.onrc.onos.ofcontroller.util.FlowPath;
39import net.onrc.onos.ofcontroller.util.IPv4Net;
40import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070041
42/**
Naoki Shiotab485d412013-11-26 12:04:19 -080043 * FlowPusher is a implementation of FlowPusherService.
44 * FlowPusher assigns one message queue instance for each one switch.
45 * Number of message processing threads is configurable by constructor, and
46 * one thread can handle multiple message queues. Each queue will be assigned to
47 * a thread according to hash function defined by getHash().
48 * Each processing thread reads messages from queues and sends it to switches
49 * in round-robin. Processing thread also calculates rate of sending to suppress
50 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070051 * @author Naoki Shiota
52 *
53 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080054public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080055 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
56
57 // NOTE: Below are moved from FlowManager.
58 // TODO: Values copied from elsewhere (class LearningSwitch).
59 // The local copy should go away!
60 //
61 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
62 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080063
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080064 // Number of messages sent to switch at once
65 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080066
67 public static final short PRIORITY_DEFAULT = 100;
68 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
69 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
70
71 public enum QueueState {
72 READY,
73 SUSPENDED,
74 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070075
Naoki Shiotac1601d32013-11-20 10:47:34 -080076 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080077 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080078 * This consists of queue itself and variables used for limiting sending rate.
79 * @author Naoki Shiota
80 *
81 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080082 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080083 private class SwitchQueue extends ArrayDeque<OFMessage> {
84 QueueState state;
85
Naoki Shiotae3199732013-11-25 16:14:43 -080086 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080087 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070088 long last_sent_time = 0;
89 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080090
Naoki Shiotae3199732013-11-25 16:14:43 -080091 // "To be deleted" flag
92 boolean toBeDeleted = false;
93
Naoki Shiota7d0cf272013-11-05 10:18:12 -080094 /**
95 * Check if sending rate is within the rate
96 * @param current Current time
97 * @return true if within the rate
98 */
99 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800100 if (max_rate == 0) {
101 // no limitation
102 return true;
103 }
104
Naoki Shiota81dbe302013-11-21 15:35:38 -0800105 if (current == last_sent_time) {
106 return false;
107 }
108
Naoki Shiotac1601d32013-11-20 10:47:34 -0800109 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800110 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800111 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800112 }
113
Naoki Shiota81dbe302013-11-21 15:35:38 -0800114 /**
115 * Log time and size of last sent data.
116 * @param current Time to be sent.
117 * @param size Size of sent data (in bytes).
118 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800119 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800120 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800121 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800122 }
123
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700124 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800125
Naoki Shiotac1601d32013-11-20 10:47:34 -0800126 private OFMessageDamper messageDamper = null;
127 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700128
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800129 private FloodlightContext context = null;
130 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800131
132 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800133 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800134 // Map of Future objects versus dpid and transaction ID.
Naoki Shiotac1601d32013-11-20 10:47:34 -0800135 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
136 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800137
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800138 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800139
Naoki Shiota8739faa2013-11-18 17:00:25 -0800140 /**
141 * Main thread that reads messages from queues and sends them to switches.
142 * @author Naoki Shiota
143 *
144 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800145 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800146 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotab485d412013-11-26 12:04:19 -0800147 = new HashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800148
Naoki Shiota81dbe302013-11-21 15:35:38 -0800149 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800150
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700151 @Override
152 public void run() {
153 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800154 try {
155 // wait for message pushed to queue
156 mutex.acquire();
157 } catch (InterruptedException e) {
158 e.printStackTrace();
159 log.debug("FlowPusherThread is interrupted");
160 return;
161 }
162
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800163 Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
164 synchronized (queues) {
165 entries = queues.entrySet();
166 }
167
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800168 for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800169 IOFSwitch sw = entry.getKey();
170 SwitchQueue queue = entry.getValue();
171
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700172 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800173 if (sw == null || queue == null ||
174 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700175 continue;
176 }
177
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800178 // check sending rate and determine it to be sent or not
Naoki Shiotab485d412013-11-26 12:04:19 -0800179 long current_time = System.currentTimeMillis();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800180 long size = 0;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800181
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800182 synchronized (queue) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800183 if (queue.isSendable(current_time)) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800184 int i = 0;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800185 while (! queue.isEmpty()) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800186 // Number of messages excess the limit
Naoki Shiota81dbe302013-11-21 15:35:38 -0800187 if (i >= MAX_MESSAGE_SEND) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800188 // Messages remains in queue
Naoki Shiota81dbe302013-11-21 15:35:38 -0800189 mutex.release();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800190 break;
191 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800192 ++i;
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800193
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800194 OFMessage msg = queue.poll();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800195 try {
196 messageDamper.write(sw, msg, context);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800197 log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800198 size += msg.getLength();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800199 } catch (IOException e) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800200 e.printStackTrace();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800201 log.error("Exception in sending message ({}) : {}", msg, e);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800202 }
203 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800204 sw.flush();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800205 queue.logSentData(current_time, size);
Naoki Shiota81dbe302013-11-21 15:35:38 -0800206
Naoki Shiotae3199732013-11-25 16:14:43 -0800207 if (queue.isEmpty()) {
Naoki Shiotab485d412013-11-26 12:04:19 -0800208 // remove queue if flagged to be.
Naoki Shiotae3199732013-11-25 16:14:43 -0800209 if (queue.toBeDeleted) {
210 synchronized (queues) {
211 queues.remove(sw);
212 }
213 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800214 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700215 }
216 }
217 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700218 }
219 }
220 }
221
Naoki Shiotac1601d32013-11-20 10:47:34 -0800222 /**
223 * Initialize object with one thread.
224 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800225 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800226 }
227
Naoki Shiotac1601d32013-11-20 10:47:34 -0800228 /**
229 * Initialize object with threads of given number.
230 * @param number_thread Number of threads to handle messages.
231 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800232 public FlowPusher(int number_thread) {
233 this.number_thread = number_thread;
234 }
235
Naoki Shiotac1601d32013-11-20 10:47:34 -0800236 /**
237 * Set parameters needed for sending messages.
238 * @param context FloodlightContext used for sending messages.
239 * If null, FlowPusher uses default context.
240 * @param modContext FloodlightModuleContext used for acquiring
241 * ThreadPoolService and registering MessageListener.
242 * @param factory Factory object to create OFMessage objects.
243 * @param damper Message damper used for sending messages.
244 * If null, FlowPusher creates its own damper object.
245 */
246 public void init(FloodlightContext context,
247 FloodlightModuleContext modContext,
248 BasicFactory factory,
249 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700250 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800251 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800252 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
253 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
254 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800255
256 if (damper != null) {
257 messageDamper = damper;
258 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800259 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800260 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
261 EnumSet.of(OFType.FLOW_MOD),
262 OFMESSAGE_DAMPER_TIMEOUT);
263 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700264 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800265
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800266 /**
267 * Begin processing queue.
268 */
269 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800270 if (factory == null) {
271 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800272 return;
273 }
274
Naoki Shiota81dbe302013-11-21 15:35:38 -0800275 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800276 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800277 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800278
Naoki Shiota81dbe302013-11-21 15:35:38 -0800279 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800280 thread.start();
281 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700282 }
283
Brian O'Connor8c166a72013-11-14 18:41:48 -0800284 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800285 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800286 SwitchQueue queue = getQueue(sw);
287
288 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800289 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800290 }
291
292 synchronized (queue) {
293 if (queue.state == QueueState.READY) {
294 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800295 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800296 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800297 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800298 }
299 }
300
Brian O'Connor8c166a72013-11-14 18:41:48 -0800301 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800302 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800303 SwitchQueue queue = getQueue(sw);
304
305 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800306 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800307 }
308
309 synchronized (queue) {
310 if (queue.state == QueueState.SUSPENDED) {
311 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800312 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800313 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800314 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800315 }
316 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800317
Brian O'Connor8c166a72013-11-14 18:41:48 -0800318 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800319 public boolean isSuspended(IOFSwitch sw) {
320 SwitchQueue queue = getQueue(sw);
321
322 if (queue == null) {
323 // TODO Is true suitable for this case?
324 return true;
325 }
326
327 return (queue.state == QueueState.SUSPENDED);
328 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800329
330 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800331 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800332 */
333 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800334 if (threadMap == null) {
335 return;
336 }
337
Naoki Shiota81dbe302013-11-21 15:35:38 -0800338 for (FlowPusherThread t : threadMap.values()) {
339 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800340 }
341 }
342
Naoki Shiotae3199732013-11-25 16:14:43 -0800343 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800344 public void setRate(IOFSwitch sw, long rate) {
345 SwitchQueue queue = getQueue(sw);
346 if (queue == null) {
347 return;
348 }
349
350 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800351 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800352 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700353 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700354 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800355
356 @Override
357 public boolean createQueue(IOFSwitch sw) {
358 SwitchQueue queue = getQueue(sw);
359 if (queue != null) {
360 return false;
361 }
362
363 FlowPusherThread proc = getProcess(sw);
364 queue = new SwitchQueue();
365 queue.state = QueueState.READY;
366 synchronized (proc) {
367 proc.queues.put(sw, queue);
368 }
369
370 return true;
371 }
372
373 @Override
374 public boolean deleteQueue(IOFSwitch sw) {
375 return deleteQueue(sw, false);
376 }
377
378 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800379 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800380 FlowPusherThread proc = getProcess(sw);
381
Naoki Shiotab485d412013-11-26 12:04:19 -0800382 if (forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800383 synchronized (proc.queues) {
384 SwitchQueue queue = proc.queues.remove(sw);
385 if (queue == null) {
386 return false;
387 }
388 }
389 return true;
390 } else {
391 SwitchQueue queue = getQueue(sw);
392 if (queue == null) {
393 return false;
394 }
395 synchronized (queue) {
396 queue.toBeDeleted = true;
397 }
398 return true;
399 }
400 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700401
Brian O'Connor8c166a72013-11-14 18:41:48 -0800402 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800403 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800404 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800405 SwitchQueue queue = proc.queues.get(sw);
406
Naoki Shiotab485d412013-11-26 12:04:19 -0800407 // create queue at first addition of message
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800408 if (queue == null) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800409 createQueue(sw);
410 queue = getQueue(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800411 }
412
413 synchronized (queue) {
414 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800415 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800416 }
417
Naoki Shiota81dbe302013-11-21 15:35:38 -0800418 if (proc.mutex.availablePermits() == 0) {
419 proc.mutex.release();
420 }
421
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800422 return true;
423 }
424
Brian O'Connor8c166a72013-11-14 18:41:48 -0800425 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800426 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800427 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800428 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
429 if (flowEntryIdStr == null)
430 return false;
431 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
432 String userState = flowEntryObj.getUserState();
433 if (userState == null)
434 return false;
435
436 //
437 // Create the Open Flow Flow Modification Entry to push
438 //
439 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
440 long cookie = flowEntryId.value();
441
442 short flowModCommand = OFFlowMod.OFPFC_ADD;
443 if (userState.equals("FE_USER_ADD")) {
444 flowModCommand = OFFlowMod.OFPFC_ADD;
445 } else if (userState.equals("FE_USER_MODIFY")) {
446 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
447 } else if (userState.equals("FE_USER_DELETE")) {
448 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
449 } else {
450 // Unknown user state. Ignore the entry
451 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
452 flowEntryId.toString(), userState);
453 return false;
454 }
455
456 //
457 // Fetch the match conditions.
458 //
459 // NOTE: The Flow matching conditions common for all Flow Entries are
460 // used ONLY if a Flow Entry does NOT have the corresponding matching
461 // condition set.
462 //
463 OFMatch match = new OFMatch();
464 match.setWildcards(OFMatch.OFPFW_ALL);
465
466 // Match the Incoming Port
467 Short matchInPort = flowEntryObj.getMatchInPort();
468 if (matchInPort != null) {
469 match.setInputPort(matchInPort);
470 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
471 }
472
473 // Match the Source MAC address
474 String matchSrcMac = flowEntryObj.getMatchSrcMac();
475 if (matchSrcMac == null)
476 matchSrcMac = flowObj.getMatchSrcMac();
477 if (matchSrcMac != null) {
478 match.setDataLayerSource(matchSrcMac);
479 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
480 }
481
482 // Match the Destination MAC address
483 String matchDstMac = flowEntryObj.getMatchDstMac();
484 if (matchDstMac == null)
485 matchDstMac = flowObj.getMatchDstMac();
486 if (matchDstMac != null) {
487 match.setDataLayerDestination(matchDstMac);
488 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
489 }
490
491 // Match the Ethernet Frame Type
492 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
493 if (matchEthernetFrameType == null)
494 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
495 if (matchEthernetFrameType != null) {
496 match.setDataLayerType(matchEthernetFrameType);
497 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
498 }
499
500 // Match the VLAN ID
501 Short matchVlanId = flowEntryObj.getMatchVlanId();
502 if (matchVlanId == null)
503 matchVlanId = flowObj.getMatchVlanId();
504 if (matchVlanId != null) {
505 match.setDataLayerVirtualLan(matchVlanId);
506 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
507 }
508
509 // Match the VLAN priority
510 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
511 if (matchVlanPriority == null)
512 matchVlanPriority = flowObj.getMatchVlanPriority();
513 if (matchVlanPriority != null) {
514 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
515 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
516 }
517
518 // Match the Source IPv4 Network prefix
519 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
520 if (matchSrcIPv4Net == null)
521 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
522 if (matchSrcIPv4Net != null) {
523 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
524 }
525
526 // Match the Destination IPv4 Network prefix
527 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
528 if (matchDstIPv4Net == null)
529 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
530 if (matchDstIPv4Net != null) {
531 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
532 }
533
534 // Match the IP protocol
535 Byte matchIpProto = flowEntryObj.getMatchIpProto();
536 if (matchIpProto == null)
537 matchIpProto = flowObj.getMatchIpProto();
538 if (matchIpProto != null) {
539 match.setNetworkProtocol(matchIpProto);
540 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
541 }
542
543 // Match the IP ToS (DSCP field, 6 bits)
544 Byte matchIpToS = flowEntryObj.getMatchIpToS();
545 if (matchIpToS == null)
546 matchIpToS = flowObj.getMatchIpToS();
547 if (matchIpToS != null) {
548 match.setNetworkTypeOfService(matchIpToS);
549 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
550 }
551
552 // Match the Source TCP/UDP port
553 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
554 if (matchSrcTcpUdpPort == null)
555 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
556 if (matchSrcTcpUdpPort != null) {
557 match.setTransportSource(matchSrcTcpUdpPort);
558 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
559 }
560
561 // Match the Destination TCP/UDP port
562 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
563 if (matchDstTcpUdpPort == null)
564 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
565 if (matchDstTcpUdpPort != null) {
566 match.setTransportDestination(matchDstTcpUdpPort);
567 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
568 }
569
570 //
571 // Fetch the actions
572 //
573 Short actionOutputPort = null;
574 List<OFAction> openFlowActions = new ArrayList<OFAction>();
575 int actionsLen = 0;
576 FlowEntryActions flowEntryActions = null;
577 String actionsStr = flowEntryObj.getActions();
578 if (actionsStr != null)
579 flowEntryActions = new FlowEntryActions(actionsStr);
580 else
581 flowEntryActions = new FlowEntryActions();
582 for (FlowEntryAction action : flowEntryActions.actions()) {
583 ActionOutput actionOutput = action.actionOutput();
584 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
585 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
586 ActionStripVlan actionStripVlan = action.actionStripVlan();
587 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
588 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
589 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
590 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
591 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
592 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
593 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
594 ActionEnqueue actionEnqueue = action.actionEnqueue();
595
596 if (actionOutput != null) {
597 actionOutputPort = actionOutput.port().value();
598 // XXX: The max length is hard-coded for now
599 OFActionOutput ofa =
600 new OFActionOutput(actionOutput.port().value(),
601 (short)0xffff);
602 openFlowActions.add(ofa);
603 actionsLen += ofa.getLength();
604 }
605
606 if (actionSetVlanId != null) {
607 OFActionVirtualLanIdentifier ofa =
608 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
609 openFlowActions.add(ofa);
610 actionsLen += ofa.getLength();
611 }
612
613 if (actionSetVlanPriority != null) {
614 OFActionVirtualLanPriorityCodePoint ofa =
615 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
616 openFlowActions.add(ofa);
617 actionsLen += ofa.getLength();
618 }
619
620 if (actionStripVlan != null) {
621 if (actionStripVlan.stripVlan() == true) {
622 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
623 openFlowActions.add(ofa);
624 actionsLen += ofa.getLength();
625 }
626 }
627
628 if (actionSetEthernetSrcAddr != null) {
629 OFActionDataLayerSource ofa =
630 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
631 openFlowActions.add(ofa);
632 actionsLen += ofa.getLength();
633 }
634
635 if (actionSetEthernetDstAddr != null) {
636 OFActionDataLayerDestination ofa =
637 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
638 openFlowActions.add(ofa);
639 actionsLen += ofa.getLength();
640 }
641
642 if (actionSetIPv4SrcAddr != null) {
643 OFActionNetworkLayerSource ofa =
644 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
645 openFlowActions.add(ofa);
646 actionsLen += ofa.getLength();
647 }
648
649 if (actionSetIPv4DstAddr != null) {
650 OFActionNetworkLayerDestination ofa =
651 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
652 openFlowActions.add(ofa);
653 actionsLen += ofa.getLength();
654 }
655
656 if (actionSetIpToS != null) {
657 OFActionNetworkTypeOfService ofa =
658 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
659 openFlowActions.add(ofa);
660 actionsLen += ofa.getLength();
661 }
662
663 if (actionSetTcpUdpSrcPort != null) {
664 OFActionTransportLayerSource ofa =
665 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
666 openFlowActions.add(ofa);
667 actionsLen += ofa.getLength();
668 }
669
670 if (actionSetTcpUdpDstPort != null) {
671 OFActionTransportLayerDestination ofa =
672 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
673 openFlowActions.add(ofa);
674 actionsLen += ofa.getLength();
675 }
676
677 if (actionEnqueue != null) {
678 OFActionEnqueue ofa =
679 new OFActionEnqueue(actionEnqueue.port().value(),
680 actionEnqueue.queueId());
681 openFlowActions.add(ofa);
682 actionsLen += ofa.getLength();
683 }
684 }
685
686 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
687 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
688 .setPriority(PRIORITY_DEFAULT)
689 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
690 .setCookie(cookie)
691 .setCommand(flowModCommand)
692 .setMatch(match)
693 .setActions(openFlowActions)
694 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
695 fm.setOutPort(OFPort.OFPP_NONE.getValue());
696 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
697 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
698 if (actionOutputPort != null)
699 fm.setOutPort(actionOutputPort);
700 }
701
702 //
703 // TODO: Set the following flag
704 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
705 // See method ForwardingBase::pushRoute()
706 //
707
708 //
709 // Write the message to the switch
710 //
711 log.debug("MEASUREMENT: Installing flow entry " + userState +
712 " into switch DPID: " +
713 sw.getStringId() +
714 " flowEntryId: " + flowEntryId.toString() +
715 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
716 " inPort: " + matchInPort + " outPort: " + actionOutputPort
717 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800718 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800719 //
720 // TODO: We should use the OpenFlow Barrier mechanism
721 // to check for errors, and update the SwitchState
722 // for a flow entry after the Barrier message is
723 // is received.
724 //
725 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
726
727 return true;
728 }
729
Brian O'Connor8c166a72013-11-14 18:41:48 -0800730 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800731 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
732 //
733 // Create the OpenFlow Flow Modification Entry to push
734 //
735 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
736 long cookie = flowEntry.flowEntryId().value();
737
738 short flowModCommand = OFFlowMod.OFPFC_ADD;
739 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
740 flowModCommand = OFFlowMod.OFPFC_ADD;
741 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
742 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
743 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
744 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
745 } else {
746 // Unknown user state. Ignore the entry
747 log.debug(
748 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
749 flowEntry.flowEntryId().toString(),
750 flowEntry.flowEntryUserState());
751 return false;
752 }
753
754 //
755 // Fetch the match conditions.
756 //
757 // NOTE: The Flow matching conditions common for all Flow Entries are
758 // used ONLY if a Flow Entry does NOT have the corresponding matching
759 // condition set.
760 //
761 OFMatch match = new OFMatch();
762 match.setWildcards(OFMatch.OFPFW_ALL);
763 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
764 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
765
766 // Match the Incoming Port
767 Port matchInPort = flowEntryMatch.inPort();
768 if (matchInPort != null) {
769 match.setInputPort(matchInPort.value());
770 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
771 }
772
773 // Match the Source MAC address
774 MACAddress matchSrcMac = flowEntryMatch.srcMac();
775 if ((matchSrcMac == null) && (flowPathMatch != null)) {
776 matchSrcMac = flowPathMatch.srcMac();
777 }
778 if (matchSrcMac != null) {
779 match.setDataLayerSource(matchSrcMac.toString());
780 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
781 }
782
783 // Match the Destination MAC address
784 MACAddress matchDstMac = flowEntryMatch.dstMac();
785 if ((matchDstMac == null) && (flowPathMatch != null)) {
786 matchDstMac = flowPathMatch.dstMac();
787 }
788 if (matchDstMac != null) {
789 match.setDataLayerDestination(matchDstMac.toString());
790 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
791 }
792
793 // Match the Ethernet Frame Type
794 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
795 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
796 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
797 }
798 if (matchEthernetFrameType != null) {
799 match.setDataLayerType(matchEthernetFrameType);
800 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
801 }
802
803 // Match the VLAN ID
804 Short matchVlanId = flowEntryMatch.vlanId();
805 if ((matchVlanId == null) && (flowPathMatch != null)) {
806 matchVlanId = flowPathMatch.vlanId();
807 }
808 if (matchVlanId != null) {
809 match.setDataLayerVirtualLan(matchVlanId);
810 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
811 }
812
813 // Match the VLAN priority
814 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
815 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
816 matchVlanPriority = flowPathMatch.vlanPriority();
817 }
818 if (matchVlanPriority != null) {
819 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
820 match.setWildcards(match.getWildcards()
821 & ~OFMatch.OFPFW_DL_VLAN_PCP);
822 }
823
824 // Match the Source IPv4 Network prefix
825 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
826 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
827 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
828 }
829 if (matchSrcIPv4Net != null) {
830 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
831 }
832
833 // Natch the Destination IPv4 Network prefix
834 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
835 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
836 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
837 }
838 if (matchDstIPv4Net != null) {
839 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
840 }
841
842 // Match the IP protocol
843 Byte matchIpProto = flowEntryMatch.ipProto();
844 if ((matchIpProto == null) && (flowPathMatch != null)) {
845 matchIpProto = flowPathMatch.ipProto();
846 }
847 if (matchIpProto != null) {
848 match.setNetworkProtocol(matchIpProto);
849 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
850 }
851
852 // Match the IP ToS (DSCP field, 6 bits)
853 Byte matchIpToS = flowEntryMatch.ipToS();
854 if ((matchIpToS == null) && (flowPathMatch != null)) {
855 matchIpToS = flowPathMatch.ipToS();
856 }
857 if (matchIpToS != null) {
858 match.setNetworkTypeOfService(matchIpToS);
859 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
860 }
861
862 // Match the Source TCP/UDP port
863 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
864 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
865 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
866 }
867 if (matchSrcTcpUdpPort != null) {
868 match.setTransportSource(matchSrcTcpUdpPort);
869 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
870 }
871
872 // Match the Destination TCP/UDP port
873 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
874 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
875 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
876 }
877 if (matchDstTcpUdpPort != null) {
878 match.setTransportDestination(matchDstTcpUdpPort);
879 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
880 }
881
882 //
883 // Fetch the actions
884 //
885 Short actionOutputPort = null;
886 List<OFAction> openFlowActions = new ArrayList<OFAction>();
887 int actionsLen = 0;
888 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
889 //
890 for (FlowEntryAction action : flowEntryActions.actions()) {
891 ActionOutput actionOutput = action.actionOutput();
892 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
893 ActionSetVlanPriority actionSetVlanPriority = action
894 .actionSetVlanPriority();
895 ActionStripVlan actionStripVlan = action.actionStripVlan();
896 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
897 .actionSetEthernetSrcAddr();
898 ActionSetEthernetAddr actionSetEthernetDstAddr = action
899 .actionSetEthernetDstAddr();
900 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
901 .actionSetIPv4SrcAddr();
902 ActionSetIPv4Addr actionSetIPv4DstAddr = action
903 .actionSetIPv4DstAddr();
904 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
905 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
906 .actionSetTcpUdpSrcPort();
907 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
908 .actionSetTcpUdpDstPort();
909 ActionEnqueue actionEnqueue = action.actionEnqueue();
910
911 if (actionOutput != null) {
912 actionOutputPort = actionOutput.port().value();
913 // XXX: The max length is hard-coded for now
914 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
915 .value(), (short) 0xffff);
916 openFlowActions.add(ofa);
917 actionsLen += ofa.getLength();
918 }
919
920 if (actionSetVlanId != null) {
921 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
922 actionSetVlanId.vlanId());
923 openFlowActions.add(ofa);
924 actionsLen += ofa.getLength();
925 }
926
927 if (actionSetVlanPriority != null) {
928 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
929 actionSetVlanPriority.vlanPriority());
930 openFlowActions.add(ofa);
931 actionsLen += ofa.getLength();
932 }
933
934 if (actionStripVlan != null) {
935 if (actionStripVlan.stripVlan() == true) {
936 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
937 openFlowActions.add(ofa);
938 actionsLen += ofa.getLength();
939 }
940 }
941
942 if (actionSetEthernetSrcAddr != null) {
943 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
944 actionSetEthernetSrcAddr.addr().toBytes());
945 openFlowActions.add(ofa);
946 actionsLen += ofa.getLength();
947 }
948
949 if (actionSetEthernetDstAddr != null) {
950 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
951 actionSetEthernetDstAddr.addr().toBytes());
952 openFlowActions.add(ofa);
953 actionsLen += ofa.getLength();
954 }
955
956 if (actionSetIPv4SrcAddr != null) {
957 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
958 actionSetIPv4SrcAddr.addr().value());
959 openFlowActions.add(ofa);
960 actionsLen += ofa.getLength();
961 }
962
963 if (actionSetIPv4DstAddr != null) {
964 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
965 actionSetIPv4DstAddr.addr().value());
966 openFlowActions.add(ofa);
967 actionsLen += ofa.getLength();
968 }
969
970 if (actionSetIpToS != null) {
971 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
972 actionSetIpToS.ipToS());
973 openFlowActions.add(ofa);
974 actionsLen += ofa.getLength();
975 }
976
977 if (actionSetTcpUdpSrcPort != null) {
978 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
979 actionSetTcpUdpSrcPort.port());
980 openFlowActions.add(ofa);
981 actionsLen += ofa.getLength();
982 }
983
984 if (actionSetTcpUdpDstPort != null) {
985 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
986 actionSetTcpUdpDstPort.port());
987 openFlowActions.add(ofa);
988 actionsLen += ofa.getLength();
989 }
990
991 if (actionEnqueue != null) {
992 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
993 .value(), actionEnqueue.queueId());
994 openFlowActions.add(ofa);
995 actionsLen += ofa.getLength();
996 }
997 }
998
999 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
1000 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
1001 .setPriority(PRIORITY_DEFAULT)
1002 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
1003 .setCommand(flowModCommand).setMatch(match)
1004 .setActions(openFlowActions)
1005 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
1006 fm.setOutPort(OFPort.OFPP_NONE.getValue());
1007 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
1008 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
1009 if (actionOutputPort != null)
1010 fm.setOutPort(actionOutputPort);
1011 }
1012
1013 //
1014 // TODO: Set the following flag
1015 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
1016 // See method ForwardingBase::pushRoute()
1017 //
1018
1019 //
1020 // Write the message to the switch
1021 //
1022 log.debug("MEASUREMENT: Installing flow entry "
1023 + flowEntry.flowEntryUserState() + " into switch DPID: "
1024 + sw.getStringId() + " flowEntryId: "
1025 + flowEntry.flowEntryId().toString() + " srcMac: "
1026 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
1027 + matchInPort + " outPort: " + actionOutputPort);
1028
1029 //
1030 // TODO: We should use the OpenFlow Barrier mechanism
1031 // to check for errors, and update the SwitchState
1032 // for a flow entry after the Barrier message is
1033 // is received.
1034 //
1035 // TODO: The FlowEntry Object in Titan should be set
1036 // to FE_SWITCH_UPDATED.
1037 //
1038
1039 return add(sw,fm);
1040 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001041
1042 @Override
1043 public OFBarrierReply barrier(IOFSwitch sw) {
1044 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
1045 if (future == null) {
1046 return null;
1047 }
1048
1049 try {
1050 return future.get();
1051 } catch (InterruptedException e) {
1052 e.printStackTrace();
1053 log.error("InterruptedException: {}", e);
1054 return null;
1055 } catch (ExecutionException e) {
1056 e.printStackTrace();
1057 log.error("ExecutionException: {}", e);
1058 return null;
1059 }
1060 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -08001061
Naoki Shiotac1601d32013-11-20 10:47:34 -08001062 @Override
1063 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1064 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -08001065
1066 if (sw == null) {
1067 return null;
1068 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001069
Naoki Shiota81dbe302013-11-21 15:35:38 -08001070 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001071 msg.setXid(sw.getNextTransactionId());
1072 add(sw, msg);
1073
1074 // TODO create Future object of message
1075 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiota81dbe302013-11-21 15:35:38 -08001076
Naoki Shiotac1601d32013-11-20 10:47:34 -08001077 synchronized (barrierFutures) {
1078 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1079 if (map == null) {
1080 map = new HashMap<Integer,OFBarrierReplyFuture>();
1081 barrierFutures.put(sw.getId(), map);
1082 }
1083 map.put(msg.getXid(), future);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001084 }
1085
1086 return future;
1087 }
1088
Naoki Shiotae3199732013-11-25 16:14:43 -08001089 /**
1090 * Get a queue attached to a switch.
1091 * @param sw Switch object
1092 * @return Queue object
1093 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001094 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001095 if (sw == null) {
1096 return null;
1097 }
1098
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001099 return getProcess(sw).queues.get(sw);
1100 }
1101
Naoki Shiotae3199732013-11-25 16:14:43 -08001102 /**
1103 * Get a hash value correspondent to a switch.
1104 * @param sw Switch object
1105 * @return Hash value
1106 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001107 protected long getHash(IOFSwitch sw) {
1108 // This code assumes DPID is sequentially assigned.
1109 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001110 return sw.getId() % number_thread;
1111 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001112
1113 /**
1114 * Get a Thread object which processes the queue attached to a switch.
1115 * @param sw Switch object
1116 * @return Thread object
1117 */
Naoki Shiota81dbe302013-11-21 15:35:38 -08001118 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001119 long hash = getHash(sw);
1120
1121 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001122 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001123
1124 @Override
1125 public String getName() {
1126 return "flowpusher";
1127 }
1128
1129 @Override
1130 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1131 return false;
1132 }
1133
1134 @Override
1135 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1136 return false;
1137 }
1138
1139 @Override
1140 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -08001141 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1142 if (map == null) {
1143 return Command.CONTINUE;
1144 }
1145
1146 OFBarrierReplyFuture future = map.get(msg.getXid());
1147 if (future == null) {
1148 return Command.CONTINUE;
1149 }
1150
1151 log.debug("Received BARRIER_REPLY : {}", msg);
1152 future.deliverFuture(sw, msg);
1153
1154 return Command.CONTINUE;
1155 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001156
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001157}