blob: e87f631a22c29564220915c5f5943af8e1534916 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08008import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080010import java.util.Set;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070011
Naoki Shiota7d0cf272013-11-05 10:18:12 -080012import org.openflow.protocol.*;
13import org.openflow.protocol.action.*;
14import org.openflow.protocol.factory.BasicFactory;
15import org.slf4j.Logger;
16import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070017
18import net.floodlightcontroller.core.FloodlightContext;
19import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080020import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080021import net.floodlightcontroller.util.OFMessageDamper;
22import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
23import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
24import net.onrc.onos.ofcontroller.util.FlowEntryAction;
25import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080026import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080027import net.onrc.onos.ofcontroller.util.FlowEntryActions;
28import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080029import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
30import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
31import net.onrc.onos.ofcontroller.util.FlowPath;
32import net.onrc.onos.ofcontroller.util.IPv4Net;
33import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070034
35/**
36 * FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
37 * FlowPusher controls the rate of sending flow_mods so that connection doesn't overflow.
38 * @author Naoki Shiota
39 *
40 */
Brian O'Connor8c166a72013-11-14 18:41:48 -080041public class FlowPusher implements IFlowPusherService {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080042 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
43
44 // NOTE: Below are moved from FlowManager.
45 // TODO: Values copied from elsewhere (class LearningSwitch).
46 // The local copy should go away!
47 //
48 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
49 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080050
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080051 // Interval of sleep when queue is empty
52 protected static final long SLEEP_MILLI_SEC = 10;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080053 protected static final int SLEEP_NANO_SEC = 0;
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080054
55 // Number of messages sent to switch at once
56 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080057
58 public static final short PRIORITY_DEFAULT = 100;
59 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
60 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
61
62 public enum QueueState {
63 READY,
64 SUSPENDED,
65 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070066
Naoki Shiota7d0cf272013-11-05 10:18:12 -080067 private class SwitchQueue extends ArrayDeque<OFMessage> {
68 QueueState state;
69
70 // Max rate of sending message (bytes/sec). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080071 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070072 long last_sent_time = 0;
73 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080074
75 /**
76 * Check if sending rate is within the rate
77 * @param current Current time
78 * @return true if within the rate
79 */
80 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -080081 if (max_rate == 0) {
82 // no limitation
83 return true;
84 }
85
Naoki Shiota7d0cf272013-11-05 10:18:12 -080086 long rate = last_sent_size / (current - last_sent_time);
87
88 if (rate < max_rate) {
89 return true;
90 } else {
91 return false;
92 }
93 }
94
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080095 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080096 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080097 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080098 }
99
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700100 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800101
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800102 private OFMessageDamper messageDamper;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700103
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800104 private FloodlightContext context = null;
105 private BasicFactory factory = null;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800106 private Map<Long, FlowPusherProcess> threadMap = null;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800107
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800108 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800109
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700110 private class FlowPusherProcess implements Runnable {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800111 private Map<IOFSwitch,SwitchQueue> queues
112 = new HashMap<IOFSwitch,SwitchQueue>();
113
114 private boolean isStopped = false;
115 private boolean isMsgAdded = false;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800116
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700117 @Override
118 public void run() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800119 log.debug("Begin Flow Pusher Process");
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700120
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700121 while (true) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800122 Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
123 synchronized (queues) {
124 entries = queues.entrySet();
125 }
126
127 // Set taint flag to false at this moment.
128 isMsgAdded = false;
129
130 for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800131 IOFSwitch sw = entry.getKey();
132 SwitchQueue queue = entry.getValue();
133
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700134 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800135 if (sw == null || queue == null ||
136 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700137 continue;
138 }
139
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800140 // check sending rate and determine it to be sent or not
141 long current_time = System.nanoTime();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800142 long size = 0;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800143
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800144 synchronized (queue) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800145 if (queue.isSendable(current_time)) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800146 int i = 0;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800147 while (! queue.isEmpty()) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800148 // Number of messages excess the limit
149 if (++i >= MAX_MESSAGE_SEND) {
150 // Messages remains in queue
151 isMsgAdded = true;
152 break;
153 }
154
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800155 OFMessage msg = queue.poll();
156
157 // if need to send, call IOFSwitch#write()
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800158 try {
159 messageDamper.write(sw, msg, context);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800160 log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800161 size += msg.getLength();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800162 } catch (IOException e) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800163 e.printStackTrace();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800164 log.error("Exception in sending message ({}) : {}", msg, e);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800165 }
166 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800167 sw.flush();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800168 queue.logSentData(current_time, size);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700169 }
170 }
171 }
172
173 // sleep while all queues are empty
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800174 while (! (isMsgAdded || isStopped)) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800175 try {
176 Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
177 } catch (InterruptedException e) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800178 e.printStackTrace();
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800179 log.error("Thread.sleep failed");
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800180 }
181 }
182
183 log.debug("Exit sleep loop.");
184
185 if (isStopped) {
186 log.debug("Pusher Process finished.");
187 return;
188 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800189
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700190 }
191 }
192 }
193
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800194 public FlowPusher() {
195
196 }
197
198 public FlowPusher(int number_thread) {
199 this.number_thread = number_thread;
200 }
201
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800202 public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700203 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800204 this.factory = factory;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800205
206 if (damper != null) {
207 messageDamper = damper;
208 } else {
209 // use default value
210 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
211 EnumSet.of(OFType.FLOW_MOD),
212 OFMESSAGE_DAMPER_TIMEOUT);
213 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700214 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800215
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800216 /**
217 * Begin processing queue.
218 */
219 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800220 if (factory == null) {
221 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800222 return;
223 }
224
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800225 threadMap = new HashMap<Long,FlowPusherProcess>();
226 for (long i = 0; i < number_thread; ++i) {
227 FlowPusherProcess runnable = new FlowPusherProcess();
228 threadMap.put(i, runnable);
229
230 Thread thread = new Thread(runnable);
231 thread.start();
232 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700233 }
234
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800235 /**
236 * Suspend processing a queue related to given switch.
237 * @param sw
238 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800239 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800240 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800241 SwitchQueue queue = getQueue(sw);
242
243 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800244 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800245 }
246
247 synchronized (queue) {
248 if (queue.state == QueueState.READY) {
249 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800250 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800251 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800252 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800253 }
254 }
255
256 /**
257 * Resume processing a queue related to given switch.
258 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800259 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800260 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800261 SwitchQueue queue = getQueue(sw);
262
263 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800264 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800265 }
266
267 synchronized (queue) {
268 if (queue.state == QueueState.SUSPENDED) {
269 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800270 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800271 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800272 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800273 }
274 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800275
Brian O'Connor8c166a72013-11-14 18:41:48 -0800276 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800277 public boolean isSuspended(IOFSwitch sw) {
278 SwitchQueue queue = getQueue(sw);
279
280 if (queue == null) {
281 // TODO Is true suitable for this case?
282 return true;
283 }
284
285 return (queue.state == QueueState.SUSPENDED);
286 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800287
288 /**
289 * End processing queue and exit thread.
290 */
291 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800292 if (threadMap == null) {
293 return;
294 }
295
296 for (FlowPusherProcess runnable : threadMap.values()) {
297 if (! runnable.isStopped) {
298 runnable.isStopped = true;
299 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800300 }
301 }
302
303 public void setRate(IOFSwitch sw, long rate) {
304 SwitchQueue queue = getQueue(sw);
305 if (queue == null) {
306 return;
307 }
308
309 if (rate > 0) {
310 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700311 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700312 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700313
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800314 /**
315 * Add OFMessage to the queue related to given switch.
316 * @param sw
317 * @param msg
318 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800319 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800320 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800321 FlowPusherProcess proc = getProcess(sw);
322 SwitchQueue queue = proc.queues.get(sw);
323
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800324 if (queue == null) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800325 queue = new SwitchQueue();
326 queue.state = QueueState.READY;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800327 synchronized (proc) {
328 proc.queues.put(sw, queue);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800329 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800330 }
331
332 synchronized (queue) {
333 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800334 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800335 }
336
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800337 proc.isMsgAdded = true;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800338
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800339 return true;
340 }
341
342 /**
343 * Create OFMessage from given flow information and add it to the queue.
344 * @param sw
345 * @param flowObj
346 * @param flowEntryObj
347 * @return
348 */
Brian O'Connor8c166a72013-11-14 18:41:48 -0800349 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800350 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800351 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800352 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
353 if (flowEntryIdStr == null)
354 return false;
355 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
356 String userState = flowEntryObj.getUserState();
357 if (userState == null)
358 return false;
359
360 //
361 // Create the Open Flow Flow Modification Entry to push
362 //
363 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
364 long cookie = flowEntryId.value();
365
366 short flowModCommand = OFFlowMod.OFPFC_ADD;
367 if (userState.equals("FE_USER_ADD")) {
368 flowModCommand = OFFlowMod.OFPFC_ADD;
369 } else if (userState.equals("FE_USER_MODIFY")) {
370 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
371 } else if (userState.equals("FE_USER_DELETE")) {
372 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
373 } else {
374 // Unknown user state. Ignore the entry
375 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
376 flowEntryId.toString(), userState);
377 return false;
378 }
379
380 //
381 // Fetch the match conditions.
382 //
383 // NOTE: The Flow matching conditions common for all Flow Entries are
384 // used ONLY if a Flow Entry does NOT have the corresponding matching
385 // condition set.
386 //
387 OFMatch match = new OFMatch();
388 match.setWildcards(OFMatch.OFPFW_ALL);
389
390 // Match the Incoming Port
391 Short matchInPort = flowEntryObj.getMatchInPort();
392 if (matchInPort != null) {
393 match.setInputPort(matchInPort);
394 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
395 }
396
397 // Match the Source MAC address
398 String matchSrcMac = flowEntryObj.getMatchSrcMac();
399 if (matchSrcMac == null)
400 matchSrcMac = flowObj.getMatchSrcMac();
401 if (matchSrcMac != null) {
402 match.setDataLayerSource(matchSrcMac);
403 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
404 }
405
406 // Match the Destination MAC address
407 String matchDstMac = flowEntryObj.getMatchDstMac();
408 if (matchDstMac == null)
409 matchDstMac = flowObj.getMatchDstMac();
410 if (matchDstMac != null) {
411 match.setDataLayerDestination(matchDstMac);
412 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
413 }
414
415 // Match the Ethernet Frame Type
416 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
417 if (matchEthernetFrameType == null)
418 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
419 if (matchEthernetFrameType != null) {
420 match.setDataLayerType(matchEthernetFrameType);
421 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
422 }
423
424 // Match the VLAN ID
425 Short matchVlanId = flowEntryObj.getMatchVlanId();
426 if (matchVlanId == null)
427 matchVlanId = flowObj.getMatchVlanId();
428 if (matchVlanId != null) {
429 match.setDataLayerVirtualLan(matchVlanId);
430 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
431 }
432
433 // Match the VLAN priority
434 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
435 if (matchVlanPriority == null)
436 matchVlanPriority = flowObj.getMatchVlanPriority();
437 if (matchVlanPriority != null) {
438 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
439 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
440 }
441
442 // Match the Source IPv4 Network prefix
443 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
444 if (matchSrcIPv4Net == null)
445 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
446 if (matchSrcIPv4Net != null) {
447 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
448 }
449
450 // Match the Destination IPv4 Network prefix
451 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
452 if (matchDstIPv4Net == null)
453 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
454 if (matchDstIPv4Net != null) {
455 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
456 }
457
458 // Match the IP protocol
459 Byte matchIpProto = flowEntryObj.getMatchIpProto();
460 if (matchIpProto == null)
461 matchIpProto = flowObj.getMatchIpProto();
462 if (matchIpProto != null) {
463 match.setNetworkProtocol(matchIpProto);
464 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
465 }
466
467 // Match the IP ToS (DSCP field, 6 bits)
468 Byte matchIpToS = flowEntryObj.getMatchIpToS();
469 if (matchIpToS == null)
470 matchIpToS = flowObj.getMatchIpToS();
471 if (matchIpToS != null) {
472 match.setNetworkTypeOfService(matchIpToS);
473 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
474 }
475
476 // Match the Source TCP/UDP port
477 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
478 if (matchSrcTcpUdpPort == null)
479 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
480 if (matchSrcTcpUdpPort != null) {
481 match.setTransportSource(matchSrcTcpUdpPort);
482 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
483 }
484
485 // Match the Destination TCP/UDP port
486 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
487 if (matchDstTcpUdpPort == null)
488 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
489 if (matchDstTcpUdpPort != null) {
490 match.setTransportDestination(matchDstTcpUdpPort);
491 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
492 }
493
494 //
495 // Fetch the actions
496 //
497 Short actionOutputPort = null;
498 List<OFAction> openFlowActions = new ArrayList<OFAction>();
499 int actionsLen = 0;
500 FlowEntryActions flowEntryActions = null;
501 String actionsStr = flowEntryObj.getActions();
502 if (actionsStr != null)
503 flowEntryActions = new FlowEntryActions(actionsStr);
504 else
505 flowEntryActions = new FlowEntryActions();
506 for (FlowEntryAction action : flowEntryActions.actions()) {
507 ActionOutput actionOutput = action.actionOutput();
508 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
509 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
510 ActionStripVlan actionStripVlan = action.actionStripVlan();
511 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
512 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
513 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
514 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
515 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
516 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
517 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
518 ActionEnqueue actionEnqueue = action.actionEnqueue();
519
520 if (actionOutput != null) {
521 actionOutputPort = actionOutput.port().value();
522 // XXX: The max length is hard-coded for now
523 OFActionOutput ofa =
524 new OFActionOutput(actionOutput.port().value(),
525 (short)0xffff);
526 openFlowActions.add(ofa);
527 actionsLen += ofa.getLength();
528 }
529
530 if (actionSetVlanId != null) {
531 OFActionVirtualLanIdentifier ofa =
532 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
533 openFlowActions.add(ofa);
534 actionsLen += ofa.getLength();
535 }
536
537 if (actionSetVlanPriority != null) {
538 OFActionVirtualLanPriorityCodePoint ofa =
539 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
540 openFlowActions.add(ofa);
541 actionsLen += ofa.getLength();
542 }
543
544 if (actionStripVlan != null) {
545 if (actionStripVlan.stripVlan() == true) {
546 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
547 openFlowActions.add(ofa);
548 actionsLen += ofa.getLength();
549 }
550 }
551
552 if (actionSetEthernetSrcAddr != null) {
553 OFActionDataLayerSource ofa =
554 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
555 openFlowActions.add(ofa);
556 actionsLen += ofa.getLength();
557 }
558
559 if (actionSetEthernetDstAddr != null) {
560 OFActionDataLayerDestination ofa =
561 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
562 openFlowActions.add(ofa);
563 actionsLen += ofa.getLength();
564 }
565
566 if (actionSetIPv4SrcAddr != null) {
567 OFActionNetworkLayerSource ofa =
568 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
569 openFlowActions.add(ofa);
570 actionsLen += ofa.getLength();
571 }
572
573 if (actionSetIPv4DstAddr != null) {
574 OFActionNetworkLayerDestination ofa =
575 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
576 openFlowActions.add(ofa);
577 actionsLen += ofa.getLength();
578 }
579
580 if (actionSetIpToS != null) {
581 OFActionNetworkTypeOfService ofa =
582 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
583 openFlowActions.add(ofa);
584 actionsLen += ofa.getLength();
585 }
586
587 if (actionSetTcpUdpSrcPort != null) {
588 OFActionTransportLayerSource ofa =
589 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
590 openFlowActions.add(ofa);
591 actionsLen += ofa.getLength();
592 }
593
594 if (actionSetTcpUdpDstPort != null) {
595 OFActionTransportLayerDestination ofa =
596 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
597 openFlowActions.add(ofa);
598 actionsLen += ofa.getLength();
599 }
600
601 if (actionEnqueue != null) {
602 OFActionEnqueue ofa =
603 new OFActionEnqueue(actionEnqueue.port().value(),
604 actionEnqueue.queueId());
605 openFlowActions.add(ofa);
606 actionsLen += ofa.getLength();
607 }
608 }
609
610 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
611 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
612 .setPriority(PRIORITY_DEFAULT)
613 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
614 .setCookie(cookie)
615 .setCommand(flowModCommand)
616 .setMatch(match)
617 .setActions(openFlowActions)
618 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
619 fm.setOutPort(OFPort.OFPP_NONE.getValue());
620 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
621 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
622 if (actionOutputPort != null)
623 fm.setOutPort(actionOutputPort);
624 }
625
626 //
627 // TODO: Set the following flag
628 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
629 // See method ForwardingBase::pushRoute()
630 //
631
632 //
633 // Write the message to the switch
634 //
635 log.debug("MEASUREMENT: Installing flow entry " + userState +
636 " into switch DPID: " +
637 sw.getStringId() +
638 " flowEntryId: " + flowEntryId.toString() +
639 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
640 " inPort: " + matchInPort + " outPort: " + actionOutputPort
641 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800642 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800643 //
644 // TODO: We should use the OpenFlow Barrier mechanism
645 // to check for errors, and update the SwitchState
646 // for a flow entry after the Barrier message is
647 // is received.
648 //
649 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
650
651 return true;
652 }
653
Brian O'Connor8c166a72013-11-14 18:41:48 -0800654 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800655 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
656 //
657 // Create the OpenFlow Flow Modification Entry to push
658 //
659 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
660 long cookie = flowEntry.flowEntryId().value();
661
662 short flowModCommand = OFFlowMod.OFPFC_ADD;
663 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
664 flowModCommand = OFFlowMod.OFPFC_ADD;
665 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
666 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
667 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
668 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
669 } else {
670 // Unknown user state. Ignore the entry
671 log.debug(
672 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
673 flowEntry.flowEntryId().toString(),
674 flowEntry.flowEntryUserState());
675 return false;
676 }
677
678 //
679 // Fetch the match conditions.
680 //
681 // NOTE: The Flow matching conditions common for all Flow Entries are
682 // used ONLY if a Flow Entry does NOT have the corresponding matching
683 // condition set.
684 //
685 OFMatch match = new OFMatch();
686 match.setWildcards(OFMatch.OFPFW_ALL);
687 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
688 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
689
690 // Match the Incoming Port
691 Port matchInPort = flowEntryMatch.inPort();
692 if (matchInPort != null) {
693 match.setInputPort(matchInPort.value());
694 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
695 }
696
697 // Match the Source MAC address
698 MACAddress matchSrcMac = flowEntryMatch.srcMac();
699 if ((matchSrcMac == null) && (flowPathMatch != null)) {
700 matchSrcMac = flowPathMatch.srcMac();
701 }
702 if (matchSrcMac != null) {
703 match.setDataLayerSource(matchSrcMac.toString());
704 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
705 }
706
707 // Match the Destination MAC address
708 MACAddress matchDstMac = flowEntryMatch.dstMac();
709 if ((matchDstMac == null) && (flowPathMatch != null)) {
710 matchDstMac = flowPathMatch.dstMac();
711 }
712 if (matchDstMac != null) {
713 match.setDataLayerDestination(matchDstMac.toString());
714 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
715 }
716
717 // Match the Ethernet Frame Type
718 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
719 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
720 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
721 }
722 if (matchEthernetFrameType != null) {
723 match.setDataLayerType(matchEthernetFrameType);
724 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
725 }
726
727 // Match the VLAN ID
728 Short matchVlanId = flowEntryMatch.vlanId();
729 if ((matchVlanId == null) && (flowPathMatch != null)) {
730 matchVlanId = flowPathMatch.vlanId();
731 }
732 if (matchVlanId != null) {
733 match.setDataLayerVirtualLan(matchVlanId);
734 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
735 }
736
737 // Match the VLAN priority
738 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
739 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
740 matchVlanPriority = flowPathMatch.vlanPriority();
741 }
742 if (matchVlanPriority != null) {
743 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
744 match.setWildcards(match.getWildcards()
745 & ~OFMatch.OFPFW_DL_VLAN_PCP);
746 }
747
748 // Match the Source IPv4 Network prefix
749 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
750 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
751 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
752 }
753 if (matchSrcIPv4Net != null) {
754 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
755 }
756
757 // Natch the Destination IPv4 Network prefix
758 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
759 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
760 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
761 }
762 if (matchDstIPv4Net != null) {
763 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
764 }
765
766 // Match the IP protocol
767 Byte matchIpProto = flowEntryMatch.ipProto();
768 if ((matchIpProto == null) && (flowPathMatch != null)) {
769 matchIpProto = flowPathMatch.ipProto();
770 }
771 if (matchIpProto != null) {
772 match.setNetworkProtocol(matchIpProto);
773 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
774 }
775
776 // Match the IP ToS (DSCP field, 6 bits)
777 Byte matchIpToS = flowEntryMatch.ipToS();
778 if ((matchIpToS == null) && (flowPathMatch != null)) {
779 matchIpToS = flowPathMatch.ipToS();
780 }
781 if (matchIpToS != null) {
782 match.setNetworkTypeOfService(matchIpToS);
783 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
784 }
785
786 // Match the Source TCP/UDP port
787 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
788 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
789 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
790 }
791 if (matchSrcTcpUdpPort != null) {
792 match.setTransportSource(matchSrcTcpUdpPort);
793 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
794 }
795
796 // Match the Destination TCP/UDP port
797 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
798 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
799 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
800 }
801 if (matchDstTcpUdpPort != null) {
802 match.setTransportDestination(matchDstTcpUdpPort);
803 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
804 }
805
806 //
807 // Fetch the actions
808 //
809 Short actionOutputPort = null;
810 List<OFAction> openFlowActions = new ArrayList<OFAction>();
811 int actionsLen = 0;
812 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
813 //
814 for (FlowEntryAction action : flowEntryActions.actions()) {
815 ActionOutput actionOutput = action.actionOutput();
816 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
817 ActionSetVlanPriority actionSetVlanPriority = action
818 .actionSetVlanPriority();
819 ActionStripVlan actionStripVlan = action.actionStripVlan();
820 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
821 .actionSetEthernetSrcAddr();
822 ActionSetEthernetAddr actionSetEthernetDstAddr = action
823 .actionSetEthernetDstAddr();
824 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
825 .actionSetIPv4SrcAddr();
826 ActionSetIPv4Addr actionSetIPv4DstAddr = action
827 .actionSetIPv4DstAddr();
828 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
829 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
830 .actionSetTcpUdpSrcPort();
831 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
832 .actionSetTcpUdpDstPort();
833 ActionEnqueue actionEnqueue = action.actionEnqueue();
834
835 if (actionOutput != null) {
836 actionOutputPort = actionOutput.port().value();
837 // XXX: The max length is hard-coded for now
838 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
839 .value(), (short) 0xffff);
840 openFlowActions.add(ofa);
841 actionsLen += ofa.getLength();
842 }
843
844 if (actionSetVlanId != null) {
845 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
846 actionSetVlanId.vlanId());
847 openFlowActions.add(ofa);
848 actionsLen += ofa.getLength();
849 }
850
851 if (actionSetVlanPriority != null) {
852 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
853 actionSetVlanPriority.vlanPriority());
854 openFlowActions.add(ofa);
855 actionsLen += ofa.getLength();
856 }
857
858 if (actionStripVlan != null) {
859 if (actionStripVlan.stripVlan() == true) {
860 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
861 openFlowActions.add(ofa);
862 actionsLen += ofa.getLength();
863 }
864 }
865
866 if (actionSetEthernetSrcAddr != null) {
867 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
868 actionSetEthernetSrcAddr.addr().toBytes());
869 openFlowActions.add(ofa);
870 actionsLen += ofa.getLength();
871 }
872
873 if (actionSetEthernetDstAddr != null) {
874 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
875 actionSetEthernetDstAddr.addr().toBytes());
876 openFlowActions.add(ofa);
877 actionsLen += ofa.getLength();
878 }
879
880 if (actionSetIPv4SrcAddr != null) {
881 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
882 actionSetIPv4SrcAddr.addr().value());
883 openFlowActions.add(ofa);
884 actionsLen += ofa.getLength();
885 }
886
887 if (actionSetIPv4DstAddr != null) {
888 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
889 actionSetIPv4DstAddr.addr().value());
890 openFlowActions.add(ofa);
891 actionsLen += ofa.getLength();
892 }
893
894 if (actionSetIpToS != null) {
895 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
896 actionSetIpToS.ipToS());
897 openFlowActions.add(ofa);
898 actionsLen += ofa.getLength();
899 }
900
901 if (actionSetTcpUdpSrcPort != null) {
902 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
903 actionSetTcpUdpSrcPort.port());
904 openFlowActions.add(ofa);
905 actionsLen += ofa.getLength();
906 }
907
908 if (actionSetTcpUdpDstPort != null) {
909 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
910 actionSetTcpUdpDstPort.port());
911 openFlowActions.add(ofa);
912 actionsLen += ofa.getLength();
913 }
914
915 if (actionEnqueue != null) {
916 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
917 .value(), actionEnqueue.queueId());
918 openFlowActions.add(ofa);
919 actionsLen += ofa.getLength();
920 }
921 }
922
923 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
924 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
925 .setPriority(PRIORITY_DEFAULT)
926 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
927 .setCommand(flowModCommand).setMatch(match)
928 .setActions(openFlowActions)
929 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
930 fm.setOutPort(OFPort.OFPP_NONE.getValue());
931 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
932 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
933 if (actionOutputPort != null)
934 fm.setOutPort(actionOutputPort);
935 }
936
937 //
938 // TODO: Set the following flag
939 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
940 // See method ForwardingBase::pushRoute()
941 //
942
943 //
944 // Write the message to the switch
945 //
946 log.debug("MEASUREMENT: Installing flow entry "
947 + flowEntry.flowEntryUserState() + " into switch DPID: "
948 + sw.getStringId() + " flowEntryId: "
949 + flowEntry.flowEntryId().toString() + " srcMac: "
950 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
951 + matchInPort + " outPort: " + actionOutputPort);
952
953 //
954 // TODO: We should use the OpenFlow Barrier mechanism
955 // to check for errors, and update the SwitchState
956 // for a flow entry after the Barrier message is
957 // is received.
958 //
959 // TODO: The FlowEntry Object in Titan should be set
960 // to FE_SWITCH_UPDATED.
961 //
962
963 return add(sw,fm);
964 }
965
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800966 private SwitchQueue getQueue(IOFSwitch sw) {
967 if (sw == null) {
968 return null;
969 }
970
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800971 return getProcess(sw).queues.get(sw);
972 }
973
974 private long getHash(IOFSwitch sw) {
975 // TODO should consider equalization algorithm
976 return sw.getId() % number_thread;
977 }
978
979 private FlowPusherProcess getProcess(IOFSwitch sw) {
980 long hash = getHash(sw);
981
982 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800983 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700984}