blob: f43a83e1567dea888f797e5d3cfdb8856d00703b [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08008import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080010import java.util.Set;
Naoki Shiotac1601d32013-11-20 10:47:34 -080011import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080012import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070013
Naoki Shiota7d0cf272013-11-05 10:18:12 -080014import org.openflow.protocol.*;
15import org.openflow.protocol.action.*;
16import org.openflow.protocol.factory.BasicFactory;
17import org.slf4j.Logger;
18import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
20import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080027import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080028import net.floodlightcontroller.util.OFMessageDamper;
29import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
30import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
31import net.onrc.onos.ofcontroller.util.FlowEntryAction;
32import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080033import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080034import net.onrc.onos.ofcontroller.util.FlowEntryActions;
35import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080036import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
37import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
38import net.onrc.onos.ofcontroller.util.FlowPath;
39import net.onrc.onos.ofcontroller.util.IPv4Net;
40import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070041
42/**
Naoki Shiota8739faa2013-11-18 17:00:25 -080043 * FlowPusher intermediates FlowManager/FlowSynchronizer and switches to push OpenFlow
44 * messages to switches in proper rate.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070045 * @author Naoki Shiota
46 *
47 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080048public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080049 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
50
Naoki Shiota81dbe302013-11-21 15:35:38 -080051 private static boolean barrierIfEmpty = false;
52
Naoki Shiota7d0cf272013-11-05 10:18:12 -080053 // NOTE: Below are moved from FlowManager.
54 // TODO: Values copied from elsewhere (class LearningSwitch).
55 // The local copy should go away!
56 //
57 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
58 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080059
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080060 // Interval of sleep when queue is empty
61 protected static final long SLEEP_MILLI_SEC = 10;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080062 protected static final int SLEEP_NANO_SEC = 0;
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080063
64 // Number of messages sent to switch at once
65 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080066
67 public static final short PRIORITY_DEFAULT = 100;
68 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
69 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
70
71 public enum QueueState {
72 READY,
73 SUSPENDED,
74 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070075
Naoki Shiotac1601d32013-11-20 10:47:34 -080076 /**
77 * Message queue attached to a switch.
78 * This consists of queue itself and variables used for limiting sending rate.
79 * @author Naoki Shiota
80 *
81 */
Naoki Shiota8739faa2013-11-18 17:00:25 -080082 @SuppressWarnings("serial")
Naoki Shiota7d0cf272013-11-05 10:18:12 -080083 private class SwitchQueue extends ArrayDeque<OFMessage> {
84 QueueState state;
85
86 // Max rate of sending message (bytes/sec). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080087 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070088 long last_sent_time = 0;
89 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080090
91 /**
92 * Check if sending rate is within the rate
93 * @param current Current time
94 * @return true if within the rate
95 */
96 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -080097 if (max_rate == 0) {
98 // no limitation
99 return true;
100 }
101
Naoki Shiota81dbe302013-11-21 15:35:38 -0800102 if (current == last_sent_time) {
103 return false;
104 }
105
Naoki Shiotac1601d32013-11-20 10:47:34 -0800106 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800107 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800108 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800109 }
110
Naoki Shiota81dbe302013-11-21 15:35:38 -0800111 /**
112 * Log time and size of last sent data.
113 * @param current Time to be sent.
114 * @param size Size of sent data (in bytes).
115 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800116 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800117 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800118 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800119 }
120
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700121 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800122
Naoki Shiotac1601d32013-11-20 10:47:34 -0800123 private OFMessageDamper messageDamper = null;
124 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700125
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800126 private FloodlightContext context = null;
127 private BasicFactory factory = null;
Naoki Shiota81dbe302013-11-21 15:35:38 -0800128 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800129 private Map<Long, Map<Integer, OFBarrierReplyFuture>>
130 barrierFutures = new HashMap<Long, Map<Integer, OFBarrierReplyFuture>>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800131
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800132 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800133
Naoki Shiota8739faa2013-11-18 17:00:25 -0800134 /**
135 * Main thread that reads messages from queues and sends them to switches.
136 * @author Naoki Shiota
137 *
138 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800139 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800140 private Map<IOFSwitch,SwitchQueue> queues
141 = new HashMap<IOFSwitch,SwitchQueue>();
142
Naoki Shiota81dbe302013-11-21 15:35:38 -0800143 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800144
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700145 @Override
146 public void run() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800147 log.debug("Begin Flow Pusher Process");
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700148
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700149 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800150 try {
151 // wait for message pushed to queue
152 mutex.acquire();
153 } catch (InterruptedException e) {
154 e.printStackTrace();
155 log.debug("FlowPusherThread is interrupted");
156 return;
157 }
158
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800159 Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
160 synchronized (queues) {
161 entries = queues.entrySet();
162 }
163
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800164 for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800165 IOFSwitch sw = entry.getKey();
166 SwitchQueue queue = entry.getValue();
167
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700168 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800169 if (sw == null || queue == null ||
170 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700171 continue;
172 }
173
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800174 // check sending rate and determine it to be sent or not
175 long current_time = System.nanoTime();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800176 long size = 0;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800177
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800178 synchronized (queue) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800179 if (queue.isSendable(current_time)) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800180 int i = 0;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800181 while (! queue.isEmpty()) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800182 // Number of messages excess the limit
Naoki Shiota81dbe302013-11-21 15:35:38 -0800183 if (i >= MAX_MESSAGE_SEND) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800184 // Messages remains in queue
Naoki Shiota81dbe302013-11-21 15:35:38 -0800185 mutex.release();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800186 break;
187 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800188 ++i;
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800189
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800190 OFMessage msg = queue.poll();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800191 try {
192 messageDamper.write(sw, msg, context);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800193 log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800194 size += msg.getLength();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800195 } catch (IOException e) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800196 e.printStackTrace();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800197 log.error("Exception in sending message ({}) : {}", msg, e);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800198 }
199 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800200 sw.flush();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800201 queue.logSentData(current_time, size);
Naoki Shiota81dbe302013-11-21 15:35:38 -0800202
203 if (queue.isEmpty() && barrierIfEmpty) {
204 barrier(sw);
205 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700206 }
207 }
208 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700209 }
210 }
211 }
212
Naoki Shiotac1601d32013-11-20 10:47:34 -0800213 /**
214 * Initialize object with one thread.
215 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800216 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800217 }
218
Naoki Shiotac1601d32013-11-20 10:47:34 -0800219 /**
220 * Initialize object with threads of given number.
221 * @param number_thread Number of threads to handle messages.
222 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800223 public FlowPusher(int number_thread) {
224 this.number_thread = number_thread;
225 }
226
Naoki Shiotac1601d32013-11-20 10:47:34 -0800227 /**
228 * Set parameters needed for sending messages.
229 * @param context FloodlightContext used for sending messages.
230 * If null, FlowPusher uses default context.
231 * @param modContext FloodlightModuleContext used for acquiring
232 * ThreadPoolService and registering MessageListener.
233 * @param factory Factory object to create OFMessage objects.
234 * @param damper Message damper used for sending messages.
235 * If null, FlowPusher creates its own damper object.
236 */
237 public void init(FloodlightContext context,
238 FloodlightModuleContext modContext,
239 BasicFactory factory,
240 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700241 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800242 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800243 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
244 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
245 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800246
247 if (damper != null) {
248 messageDamper = damper;
249 } else {
250 // use default value
251 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
252 EnumSet.of(OFType.FLOW_MOD),
253 OFMESSAGE_DAMPER_TIMEOUT);
254 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700255 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800256
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800257 /**
258 * Begin processing queue.
259 */
260 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800261 if (factory == null) {
262 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800263 return;
264 }
265
Naoki Shiota81dbe302013-11-21 15:35:38 -0800266 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800267 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800268 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800269
Naoki Shiota81dbe302013-11-21 15:35:38 -0800270 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800271 thread.start();
272 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700273 }
274
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800275 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800276 * Suspend sending messages to switch.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800277 * @param sw
278 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800279 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800280 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800281 SwitchQueue queue = getQueue(sw);
282
283 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800284 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800285 }
286
287 synchronized (queue) {
288 if (queue.state == QueueState.READY) {
289 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800290 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800291 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800292 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800293 }
294 }
295
296 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800297 * Resume sending messages to switch.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800298 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800299 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800300 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800301 SwitchQueue queue = getQueue(sw);
302
303 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800304 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800305 }
306
307 synchronized (queue) {
308 if (queue.state == QueueState.SUSPENDED) {
309 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800310 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800311 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800312 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800313 }
314 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800315
Naoki Shiota8739faa2013-11-18 17:00:25 -0800316 /**
317 * Check if given switch is suspended.
318 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800319 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800320 public boolean isSuspended(IOFSwitch sw) {
321 SwitchQueue queue = getQueue(sw);
322
323 if (queue == null) {
324 // TODO Is true suitable for this case?
325 return true;
326 }
327
328 return (queue.state == QueueState.SUSPENDED);
329 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800330
331 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800332 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800333 */
334 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800335 if (threadMap == null) {
336 return;
337 }
338
Naoki Shiota81dbe302013-11-21 15:35:38 -0800339 for (FlowPusherThread t : threadMap.values()) {
340 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800341 }
342 }
343
Naoki Shiota8739faa2013-11-18 17:00:25 -0800344 /**
345 * Set sending rate to a switch.
346 * @param sw Switch.
347 * @param rate Rate in bytes/sec.
348 */
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800349 public void setRate(IOFSwitch sw, long rate) {
350 SwitchQueue queue = getQueue(sw);
351 if (queue == null) {
352 return;
353 }
354
355 if (rate > 0) {
356 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700357 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700358 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700359
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800360 /**
Naoki Shiotac1601d32013-11-20 10:47:34 -0800361 * Add OFMessage to queue of the switch.
Naoki Shiota8739faa2013-11-18 17:00:25 -0800362 * @param sw Switch to which message is sent.
363 * @param msg Message to be sent.
364 * @return true if succeed.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800365 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800366 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800367 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800368 FlowPusherThread proc = getProcess(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800369 SwitchQueue queue = proc.queues.get(sw);
370
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800371 if (queue == null) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800372 queue = new SwitchQueue();
373 queue.state = QueueState.READY;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800374 synchronized (proc) {
375 proc.queues.put(sw, queue);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800376 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800377 }
378
379 synchronized (queue) {
380 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800381 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800382 }
383
Naoki Shiota81dbe302013-11-21 15:35:38 -0800384 if (proc.mutex.availablePermits() == 0) {
385 proc.mutex.release();
386 }
387
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800388 return true;
389 }
390
391 /**
392 * Create OFMessage from given flow information and add it to the queue.
Naoki Shiota8739faa2013-11-18 17:00:25 -0800393 * @param sw Switch to which message is sent.
394 * @param flowObj FlowPath.
395 * @param flowEntryObj FlowEntry.
396 * @return true if succeed.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800397 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800398 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800399 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800400 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800401 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
402 if (flowEntryIdStr == null)
403 return false;
404 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
405 String userState = flowEntryObj.getUserState();
406 if (userState == null)
407 return false;
408
409 //
410 // Create the Open Flow Flow Modification Entry to push
411 //
412 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
413 long cookie = flowEntryId.value();
414
415 short flowModCommand = OFFlowMod.OFPFC_ADD;
416 if (userState.equals("FE_USER_ADD")) {
417 flowModCommand = OFFlowMod.OFPFC_ADD;
418 } else if (userState.equals("FE_USER_MODIFY")) {
419 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
420 } else if (userState.equals("FE_USER_DELETE")) {
421 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
422 } else {
423 // Unknown user state. Ignore the entry
424 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
425 flowEntryId.toString(), userState);
426 return false;
427 }
428
429 //
430 // Fetch the match conditions.
431 //
432 // NOTE: The Flow matching conditions common for all Flow Entries are
433 // used ONLY if a Flow Entry does NOT have the corresponding matching
434 // condition set.
435 //
436 OFMatch match = new OFMatch();
437 match.setWildcards(OFMatch.OFPFW_ALL);
438
439 // Match the Incoming Port
440 Short matchInPort = flowEntryObj.getMatchInPort();
441 if (matchInPort != null) {
442 match.setInputPort(matchInPort);
443 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
444 }
445
446 // Match the Source MAC address
447 String matchSrcMac = flowEntryObj.getMatchSrcMac();
448 if (matchSrcMac == null)
449 matchSrcMac = flowObj.getMatchSrcMac();
450 if (matchSrcMac != null) {
451 match.setDataLayerSource(matchSrcMac);
452 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
453 }
454
455 // Match the Destination MAC address
456 String matchDstMac = flowEntryObj.getMatchDstMac();
457 if (matchDstMac == null)
458 matchDstMac = flowObj.getMatchDstMac();
459 if (matchDstMac != null) {
460 match.setDataLayerDestination(matchDstMac);
461 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
462 }
463
464 // Match the Ethernet Frame Type
465 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
466 if (matchEthernetFrameType == null)
467 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
468 if (matchEthernetFrameType != null) {
469 match.setDataLayerType(matchEthernetFrameType);
470 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
471 }
472
473 // Match the VLAN ID
474 Short matchVlanId = flowEntryObj.getMatchVlanId();
475 if (matchVlanId == null)
476 matchVlanId = flowObj.getMatchVlanId();
477 if (matchVlanId != null) {
478 match.setDataLayerVirtualLan(matchVlanId);
479 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
480 }
481
482 // Match the VLAN priority
483 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
484 if (matchVlanPriority == null)
485 matchVlanPriority = flowObj.getMatchVlanPriority();
486 if (matchVlanPriority != null) {
487 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
488 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
489 }
490
491 // Match the Source IPv4 Network prefix
492 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
493 if (matchSrcIPv4Net == null)
494 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
495 if (matchSrcIPv4Net != null) {
496 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
497 }
498
499 // Match the Destination IPv4 Network prefix
500 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
501 if (matchDstIPv4Net == null)
502 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
503 if (matchDstIPv4Net != null) {
504 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
505 }
506
507 // Match the IP protocol
508 Byte matchIpProto = flowEntryObj.getMatchIpProto();
509 if (matchIpProto == null)
510 matchIpProto = flowObj.getMatchIpProto();
511 if (matchIpProto != null) {
512 match.setNetworkProtocol(matchIpProto);
513 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
514 }
515
516 // Match the IP ToS (DSCP field, 6 bits)
517 Byte matchIpToS = flowEntryObj.getMatchIpToS();
518 if (matchIpToS == null)
519 matchIpToS = flowObj.getMatchIpToS();
520 if (matchIpToS != null) {
521 match.setNetworkTypeOfService(matchIpToS);
522 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
523 }
524
525 // Match the Source TCP/UDP port
526 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
527 if (matchSrcTcpUdpPort == null)
528 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
529 if (matchSrcTcpUdpPort != null) {
530 match.setTransportSource(matchSrcTcpUdpPort);
531 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
532 }
533
534 // Match the Destination TCP/UDP port
535 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
536 if (matchDstTcpUdpPort == null)
537 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
538 if (matchDstTcpUdpPort != null) {
539 match.setTransportDestination(matchDstTcpUdpPort);
540 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
541 }
542
543 //
544 // Fetch the actions
545 //
546 Short actionOutputPort = null;
547 List<OFAction> openFlowActions = new ArrayList<OFAction>();
548 int actionsLen = 0;
549 FlowEntryActions flowEntryActions = null;
550 String actionsStr = flowEntryObj.getActions();
551 if (actionsStr != null)
552 flowEntryActions = new FlowEntryActions(actionsStr);
553 else
554 flowEntryActions = new FlowEntryActions();
555 for (FlowEntryAction action : flowEntryActions.actions()) {
556 ActionOutput actionOutput = action.actionOutput();
557 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
558 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
559 ActionStripVlan actionStripVlan = action.actionStripVlan();
560 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
561 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
562 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
563 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
564 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
565 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
566 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
567 ActionEnqueue actionEnqueue = action.actionEnqueue();
568
569 if (actionOutput != null) {
570 actionOutputPort = actionOutput.port().value();
571 // XXX: The max length is hard-coded for now
572 OFActionOutput ofa =
573 new OFActionOutput(actionOutput.port().value(),
574 (short)0xffff);
575 openFlowActions.add(ofa);
576 actionsLen += ofa.getLength();
577 }
578
579 if (actionSetVlanId != null) {
580 OFActionVirtualLanIdentifier ofa =
581 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
582 openFlowActions.add(ofa);
583 actionsLen += ofa.getLength();
584 }
585
586 if (actionSetVlanPriority != null) {
587 OFActionVirtualLanPriorityCodePoint ofa =
588 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
589 openFlowActions.add(ofa);
590 actionsLen += ofa.getLength();
591 }
592
593 if (actionStripVlan != null) {
594 if (actionStripVlan.stripVlan() == true) {
595 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
596 openFlowActions.add(ofa);
597 actionsLen += ofa.getLength();
598 }
599 }
600
601 if (actionSetEthernetSrcAddr != null) {
602 OFActionDataLayerSource ofa =
603 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
604 openFlowActions.add(ofa);
605 actionsLen += ofa.getLength();
606 }
607
608 if (actionSetEthernetDstAddr != null) {
609 OFActionDataLayerDestination ofa =
610 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
611 openFlowActions.add(ofa);
612 actionsLen += ofa.getLength();
613 }
614
615 if (actionSetIPv4SrcAddr != null) {
616 OFActionNetworkLayerSource ofa =
617 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
618 openFlowActions.add(ofa);
619 actionsLen += ofa.getLength();
620 }
621
622 if (actionSetIPv4DstAddr != null) {
623 OFActionNetworkLayerDestination ofa =
624 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
625 openFlowActions.add(ofa);
626 actionsLen += ofa.getLength();
627 }
628
629 if (actionSetIpToS != null) {
630 OFActionNetworkTypeOfService ofa =
631 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
632 openFlowActions.add(ofa);
633 actionsLen += ofa.getLength();
634 }
635
636 if (actionSetTcpUdpSrcPort != null) {
637 OFActionTransportLayerSource ofa =
638 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
639 openFlowActions.add(ofa);
640 actionsLen += ofa.getLength();
641 }
642
643 if (actionSetTcpUdpDstPort != null) {
644 OFActionTransportLayerDestination ofa =
645 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
646 openFlowActions.add(ofa);
647 actionsLen += ofa.getLength();
648 }
649
650 if (actionEnqueue != null) {
651 OFActionEnqueue ofa =
652 new OFActionEnqueue(actionEnqueue.port().value(),
653 actionEnqueue.queueId());
654 openFlowActions.add(ofa);
655 actionsLen += ofa.getLength();
656 }
657 }
658
659 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
660 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
661 .setPriority(PRIORITY_DEFAULT)
662 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
663 .setCookie(cookie)
664 .setCommand(flowModCommand)
665 .setMatch(match)
666 .setActions(openFlowActions)
667 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
668 fm.setOutPort(OFPort.OFPP_NONE.getValue());
669 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
670 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
671 if (actionOutputPort != null)
672 fm.setOutPort(actionOutputPort);
673 }
674
675 //
676 // TODO: Set the following flag
677 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
678 // See method ForwardingBase::pushRoute()
679 //
680
681 //
682 // Write the message to the switch
683 //
684 log.debug("MEASUREMENT: Installing flow entry " + userState +
685 " into switch DPID: " +
686 sw.getStringId() +
687 " flowEntryId: " + flowEntryId.toString() +
688 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
689 " inPort: " + matchInPort + " outPort: " + actionOutputPort
690 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800691 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800692 //
693 // TODO: We should use the OpenFlow Barrier mechanism
694 // to check for errors, and update the SwitchState
695 // for a flow entry after the Barrier message is
696 // is received.
697 //
698 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
699
700 return true;
701 }
702
Naoki Shiota8739faa2013-11-18 17:00:25 -0800703 /**
704 * Create OFMessage from given flow information and add it to the queue.
705 * @param sw Switch to which message is sent.
706 * @param flowPath FlowPath.
707 * @param flowEntry FlowEntry.
708 * @return true if secceed.
709 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800710 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800711 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
712 //
713 // Create the OpenFlow Flow Modification Entry to push
714 //
715 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
716 long cookie = flowEntry.flowEntryId().value();
717
718 short flowModCommand = OFFlowMod.OFPFC_ADD;
719 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
720 flowModCommand = OFFlowMod.OFPFC_ADD;
721 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
722 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
723 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
724 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
725 } else {
726 // Unknown user state. Ignore the entry
727 log.debug(
728 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
729 flowEntry.flowEntryId().toString(),
730 flowEntry.flowEntryUserState());
731 return false;
732 }
733
734 //
735 // Fetch the match conditions.
736 //
737 // NOTE: The Flow matching conditions common for all Flow Entries are
738 // used ONLY if a Flow Entry does NOT have the corresponding matching
739 // condition set.
740 //
741 OFMatch match = new OFMatch();
742 match.setWildcards(OFMatch.OFPFW_ALL);
743 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
744 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
745
746 // Match the Incoming Port
747 Port matchInPort = flowEntryMatch.inPort();
748 if (matchInPort != null) {
749 match.setInputPort(matchInPort.value());
750 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
751 }
752
753 // Match the Source MAC address
754 MACAddress matchSrcMac = flowEntryMatch.srcMac();
755 if ((matchSrcMac == null) && (flowPathMatch != null)) {
756 matchSrcMac = flowPathMatch.srcMac();
757 }
758 if (matchSrcMac != null) {
759 match.setDataLayerSource(matchSrcMac.toString());
760 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
761 }
762
763 // Match the Destination MAC address
764 MACAddress matchDstMac = flowEntryMatch.dstMac();
765 if ((matchDstMac == null) && (flowPathMatch != null)) {
766 matchDstMac = flowPathMatch.dstMac();
767 }
768 if (matchDstMac != null) {
769 match.setDataLayerDestination(matchDstMac.toString());
770 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
771 }
772
773 // Match the Ethernet Frame Type
774 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
775 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
776 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
777 }
778 if (matchEthernetFrameType != null) {
779 match.setDataLayerType(matchEthernetFrameType);
780 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
781 }
782
783 // Match the VLAN ID
784 Short matchVlanId = flowEntryMatch.vlanId();
785 if ((matchVlanId == null) && (flowPathMatch != null)) {
786 matchVlanId = flowPathMatch.vlanId();
787 }
788 if (matchVlanId != null) {
789 match.setDataLayerVirtualLan(matchVlanId);
790 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
791 }
792
793 // Match the VLAN priority
794 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
795 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
796 matchVlanPriority = flowPathMatch.vlanPriority();
797 }
798 if (matchVlanPriority != null) {
799 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
800 match.setWildcards(match.getWildcards()
801 & ~OFMatch.OFPFW_DL_VLAN_PCP);
802 }
803
804 // Match the Source IPv4 Network prefix
805 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
806 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
807 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
808 }
809 if (matchSrcIPv4Net != null) {
810 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
811 }
812
813 // Natch the Destination IPv4 Network prefix
814 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
815 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
816 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
817 }
818 if (matchDstIPv4Net != null) {
819 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
820 }
821
822 // Match the IP protocol
823 Byte matchIpProto = flowEntryMatch.ipProto();
824 if ((matchIpProto == null) && (flowPathMatch != null)) {
825 matchIpProto = flowPathMatch.ipProto();
826 }
827 if (matchIpProto != null) {
828 match.setNetworkProtocol(matchIpProto);
829 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
830 }
831
832 // Match the IP ToS (DSCP field, 6 bits)
833 Byte matchIpToS = flowEntryMatch.ipToS();
834 if ((matchIpToS == null) && (flowPathMatch != null)) {
835 matchIpToS = flowPathMatch.ipToS();
836 }
837 if (matchIpToS != null) {
838 match.setNetworkTypeOfService(matchIpToS);
839 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
840 }
841
842 // Match the Source TCP/UDP port
843 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
844 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
845 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
846 }
847 if (matchSrcTcpUdpPort != null) {
848 match.setTransportSource(matchSrcTcpUdpPort);
849 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
850 }
851
852 // Match the Destination TCP/UDP port
853 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
854 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
855 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
856 }
857 if (matchDstTcpUdpPort != null) {
858 match.setTransportDestination(matchDstTcpUdpPort);
859 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
860 }
861
862 //
863 // Fetch the actions
864 //
865 Short actionOutputPort = null;
866 List<OFAction> openFlowActions = new ArrayList<OFAction>();
867 int actionsLen = 0;
868 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
869 //
870 for (FlowEntryAction action : flowEntryActions.actions()) {
871 ActionOutput actionOutput = action.actionOutput();
872 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
873 ActionSetVlanPriority actionSetVlanPriority = action
874 .actionSetVlanPriority();
875 ActionStripVlan actionStripVlan = action.actionStripVlan();
876 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
877 .actionSetEthernetSrcAddr();
878 ActionSetEthernetAddr actionSetEthernetDstAddr = action
879 .actionSetEthernetDstAddr();
880 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
881 .actionSetIPv4SrcAddr();
882 ActionSetIPv4Addr actionSetIPv4DstAddr = action
883 .actionSetIPv4DstAddr();
884 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
885 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
886 .actionSetTcpUdpSrcPort();
887 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
888 .actionSetTcpUdpDstPort();
889 ActionEnqueue actionEnqueue = action.actionEnqueue();
890
891 if (actionOutput != null) {
892 actionOutputPort = actionOutput.port().value();
893 // XXX: The max length is hard-coded for now
894 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
895 .value(), (short) 0xffff);
896 openFlowActions.add(ofa);
897 actionsLen += ofa.getLength();
898 }
899
900 if (actionSetVlanId != null) {
901 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
902 actionSetVlanId.vlanId());
903 openFlowActions.add(ofa);
904 actionsLen += ofa.getLength();
905 }
906
907 if (actionSetVlanPriority != null) {
908 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
909 actionSetVlanPriority.vlanPriority());
910 openFlowActions.add(ofa);
911 actionsLen += ofa.getLength();
912 }
913
914 if (actionStripVlan != null) {
915 if (actionStripVlan.stripVlan() == true) {
916 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
917 openFlowActions.add(ofa);
918 actionsLen += ofa.getLength();
919 }
920 }
921
922 if (actionSetEthernetSrcAddr != null) {
923 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
924 actionSetEthernetSrcAddr.addr().toBytes());
925 openFlowActions.add(ofa);
926 actionsLen += ofa.getLength();
927 }
928
929 if (actionSetEthernetDstAddr != null) {
930 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
931 actionSetEthernetDstAddr.addr().toBytes());
932 openFlowActions.add(ofa);
933 actionsLen += ofa.getLength();
934 }
935
936 if (actionSetIPv4SrcAddr != null) {
937 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
938 actionSetIPv4SrcAddr.addr().value());
939 openFlowActions.add(ofa);
940 actionsLen += ofa.getLength();
941 }
942
943 if (actionSetIPv4DstAddr != null) {
944 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
945 actionSetIPv4DstAddr.addr().value());
946 openFlowActions.add(ofa);
947 actionsLen += ofa.getLength();
948 }
949
950 if (actionSetIpToS != null) {
951 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
952 actionSetIpToS.ipToS());
953 openFlowActions.add(ofa);
954 actionsLen += ofa.getLength();
955 }
956
957 if (actionSetTcpUdpSrcPort != null) {
958 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
959 actionSetTcpUdpSrcPort.port());
960 openFlowActions.add(ofa);
961 actionsLen += ofa.getLength();
962 }
963
964 if (actionSetTcpUdpDstPort != null) {
965 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
966 actionSetTcpUdpDstPort.port());
967 openFlowActions.add(ofa);
968 actionsLen += ofa.getLength();
969 }
970
971 if (actionEnqueue != null) {
972 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
973 .value(), actionEnqueue.queueId());
974 openFlowActions.add(ofa);
975 actionsLen += ofa.getLength();
976 }
977 }
978
979 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
980 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
981 .setPriority(PRIORITY_DEFAULT)
982 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
983 .setCommand(flowModCommand).setMatch(match)
984 .setActions(openFlowActions)
985 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
986 fm.setOutPort(OFPort.OFPP_NONE.getValue());
987 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
988 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
989 if (actionOutputPort != null)
990 fm.setOutPort(actionOutputPort);
991 }
992
993 //
994 // TODO: Set the following flag
995 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
996 // See method ForwardingBase::pushRoute()
997 //
998
999 //
1000 // Write the message to the switch
1001 //
1002 log.debug("MEASUREMENT: Installing flow entry "
1003 + flowEntry.flowEntryUserState() + " into switch DPID: "
1004 + sw.getStringId() + " flowEntryId: "
1005 + flowEntry.flowEntryId().toString() + " srcMac: "
1006 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
1007 + matchInPort + " outPort: " + actionOutputPort);
1008
1009 //
1010 // TODO: We should use the OpenFlow Barrier mechanism
1011 // to check for errors, and update the SwitchState
1012 // for a flow entry after the Barrier message is
1013 // is received.
1014 //
1015 // TODO: The FlowEntry Object in Titan should be set
1016 // to FE_SWITCH_UPDATED.
1017 //
1018
1019 return add(sw,fm);
1020 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001021
1022 @Override
1023 public OFBarrierReply barrier(IOFSwitch sw) {
1024 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
1025 if (future == null) {
1026 return null;
1027 }
1028
1029 try {
1030 return future.get();
1031 } catch (InterruptedException e) {
1032 e.printStackTrace();
1033 log.error("InterruptedException: {}", e);
1034 return null;
1035 } catch (ExecutionException e) {
1036 e.printStackTrace();
1037 log.error("ExecutionException: {}", e);
1038 return null;
1039 }
1040 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -08001041
Naoki Shiotac1601d32013-11-20 10:47:34 -08001042 @Override
1043 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1044 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -08001045
1046 if (sw == null) {
1047 return null;
1048 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001049
Naoki Shiota81dbe302013-11-21 15:35:38 -08001050 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001051 msg.setXid(sw.getNextTransactionId());
1052 add(sw, msg);
1053
1054 // TODO create Future object of message
1055 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
Naoki Shiota81dbe302013-11-21 15:35:38 -08001056
Naoki Shiotac1601d32013-11-20 10:47:34 -08001057 synchronized (barrierFutures) {
1058 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1059 if (map == null) {
1060 map = new HashMap<Integer,OFBarrierReplyFuture>();
1061 barrierFutures.put(sw.getId(), map);
1062 }
1063 map.put(msg.getXid(), future);
Naoki Shiota81dbe302013-11-21 15:35:38 -08001064 log.debug("Inserted future for {}", msg.getXid());
Naoki Shiotac1601d32013-11-20 10:47:34 -08001065 }
1066
1067 return future;
1068 }
1069
1070 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001071 if (sw == null) {
1072 return null;
1073 }
1074
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001075 return getProcess(sw).queues.get(sw);
1076 }
1077
Naoki Shiotac1601d32013-11-20 10:47:34 -08001078 protected long getHash(IOFSwitch sw) {
1079 // This code assumes DPID is sequentially assigned.
1080 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001081 return sw.getId() % number_thread;
1082 }
1083
Naoki Shiota81dbe302013-11-21 15:35:38 -08001084 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001085 long hash = getHash(sw);
1086
1087 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001088 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001089
1090 @Override
1091 public String getName() {
1092 return "flowpusher";
1093 }
1094
1095 @Override
1096 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1097 return false;
1098 }
1099
1100 @Override
1101 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1102 return false;
1103 }
1104
1105 @Override
1106 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotac1601d32013-11-20 10:47:34 -08001107 Map<Integer,OFBarrierReplyFuture> map = barrierFutures.get(sw.getId());
1108 if (map == null) {
1109 return Command.CONTINUE;
1110 }
1111
1112 OFBarrierReplyFuture future = map.get(msg.getXid());
1113 if (future == null) {
1114 return Command.CONTINUE;
1115 }
1116
1117 log.debug("Received BARRIER_REPLY : {}", msg);
1118 future.deliverFuture(sw, msg);
1119
1120 return Command.CONTINUE;
1121 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001122}