blob: 5655dfaf8c326b697ded5252231669a35eaa86bc [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08008import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080010import java.util.Set;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070011
Naoki Shiota7d0cf272013-11-05 10:18:12 -080012import org.openflow.protocol.*;
13import org.openflow.protocol.action.*;
14import org.openflow.protocol.factory.BasicFactory;
15import org.slf4j.Logger;
16import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070017
18import net.floodlightcontroller.core.FloodlightContext;
19import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080020import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080021import net.floodlightcontroller.util.OFMessageDamper;
22import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
23import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
24import net.onrc.onos.ofcontroller.util.FlowEntryAction;
25import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080026import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080027import net.onrc.onos.ofcontroller.util.FlowEntryActions;
28import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080029import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
30import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
31import net.onrc.onos.ofcontroller.util.FlowPath;
32import net.onrc.onos.ofcontroller.util.IPv4Net;
33import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070034
35/**
36 * FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
37 * FlowPusher controls the rate of sending flow_mods so that connection doesn't overflow.
38 * @author Naoki Shiota
39 *
40 */
41public class FlowPusher {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080042 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
43
44 // NOTE: Below are moved from FlowManager.
45 // TODO: Values copied from elsewhere (class LearningSwitch).
46 // The local copy should go away!
47 //
48 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
49 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080050
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080051 // Interval of sleep when queue is empty
52 protected static final long SLEEP_MILLI_SEC = 10;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080053 protected static final int SLEEP_NANO_SEC = 0;
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080054
55 // Number of messages sent to switch at once
56 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080057
58 public static final short PRIORITY_DEFAULT = 100;
59 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
60 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
61
62 public enum QueueState {
63 READY,
64 SUSPENDED,
65 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070066
Naoki Shiota7d0cf272013-11-05 10:18:12 -080067 private class SwitchQueue extends ArrayDeque<OFMessage> {
68 QueueState state;
69
70 // Max rate of sending message (bytes/sec). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080071 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070072 long last_sent_time = 0;
73 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080074
75 /**
76 * Check if sending rate is within the rate
77 * @param current Current time
78 * @return true if within the rate
79 */
80 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -080081 if (max_rate == 0) {
82 // no limitation
83 return true;
84 }
85
Naoki Shiota7d0cf272013-11-05 10:18:12 -080086 long rate = last_sent_size / (current - last_sent_time);
87
88 if (rate < max_rate) {
89 return true;
90 } else {
91 return false;
92 }
93 }
94
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080095 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080096 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080097 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080098 }
99
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700100 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800101
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800102 private OFMessageDamper messageDamper;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700103
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800104 private FloodlightContext context = null;
105 private BasicFactory factory = null;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800106 private Map<Long, FlowPusherProcess> threadMap = null;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800107
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800108 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800109
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700110 private class FlowPusherProcess implements Runnable {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800111 private Map<IOFSwitch,SwitchQueue> queues
112 = new HashMap<IOFSwitch,SwitchQueue>();
113
114 private boolean isStopped = false;
115 private boolean isMsgAdded = false;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800116
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700117 @Override
118 public void run() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800119 log.debug("Begin Flow Pusher Process");
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700120
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700121 while (true) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800122 Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
123 synchronized (queues) {
124 entries = queues.entrySet();
125 }
126
127 // Set taint flag to false at this moment.
128 isMsgAdded = false;
129
130 for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800131 IOFSwitch sw = entry.getKey();
132 SwitchQueue queue = entry.getValue();
133
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700134 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800135 if (sw == null || queue == null ||
136 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700137 continue;
138 }
139
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800140 // check sending rate and determine it to be sent or not
141 long current_time = System.nanoTime();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800142 long size = 0;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800143
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800144 synchronized (queue) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800145 if (queue.isSendable(current_time)) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800146 int i = 0;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800147 while (! queue.isEmpty()) {
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800148 // Number of messages excess the limit
149 if (++i >= MAX_MESSAGE_SEND) {
150 // Messages remains in queue
151 isMsgAdded = true;
152 break;
153 }
154
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800155 OFMessage msg = queue.poll();
156
157 // if need to send, call IOFSwitch#write()
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800158 try {
159 messageDamper.write(sw, msg, context);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800160 log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800161 size += msg.getLength();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800162 } catch (IOException e) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800163 e.printStackTrace();
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -0800164 log.error("Exception in sending message ({}) : {}", msg, e);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800165 }
166 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800167 sw.flush();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800168 queue.logSentData(current_time, size);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700169 }
170 }
171 }
172
173 // sleep while all queues are empty
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800174 while (! (isMsgAdded || isStopped)) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800175 try {
176 Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
177 } catch (InterruptedException e) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800178 e.printStackTrace();
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800179 log.error("Thread.sleep failed");
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800180 }
181 }
182
183 log.debug("Exit sleep loop.");
184
185 if (isStopped) {
186 log.debug("Pusher Process finished.");
187 return;
188 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800189
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700190 }
191 }
192 }
193
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800194 public FlowPusher() {
195
196 }
197
198 public FlowPusher(int number_thread) {
199 this.number_thread = number_thread;
200 }
201
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800202 public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700203 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800204 this.factory = factory;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800205
206 if (damper != null) {
207 messageDamper = damper;
208 } else {
209 // use default value
210 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
211 EnumSet.of(OFType.FLOW_MOD),
212 OFMESSAGE_DAMPER_TIMEOUT);
213 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700214 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800215
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800216 /**
217 * Begin processing queue.
218 */
219 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800220 if (factory == null) {
221 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800222 return;
223 }
224
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800225 threadMap = new HashMap<Long,FlowPusherProcess>();
226 for (long i = 0; i < number_thread; ++i) {
227 FlowPusherProcess runnable = new FlowPusherProcess();
228 threadMap.put(i, runnable);
229
230 Thread thread = new Thread(runnable);
231 thread.start();
232 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700233 }
234
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800235 /**
236 * Suspend processing a queue related to given switch.
237 * @param sw
238 */
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800239 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800240 SwitchQueue queue = getQueue(sw);
241
242 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800243 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800244 }
245
246 synchronized (queue) {
247 if (queue.state == QueueState.READY) {
248 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800249 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800250 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800251 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800252 }
253 }
254
255 /**
256 * Resume processing a queue related to given switch.
257 */
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800258 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800259 SwitchQueue queue = getQueue(sw);
260
261 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800262 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800263 }
264
265 synchronized (queue) {
266 if (queue.state == QueueState.SUSPENDED) {
267 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800268 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800269 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800270 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800271 }
272 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800273
274 public boolean isSuspended(IOFSwitch sw) {
275 SwitchQueue queue = getQueue(sw);
276
277 if (queue == null) {
278 // TODO Is true suitable for this case?
279 return true;
280 }
281
282 return (queue.state == QueueState.SUSPENDED);
283 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800284
285 /**
286 * End processing queue and exit thread.
287 */
288 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800289 if (threadMap == null) {
290 return;
291 }
292
293 for (FlowPusherProcess runnable : threadMap.values()) {
294 if (! runnable.isStopped) {
295 runnable.isStopped = true;
296 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800297 }
298 }
299
300 public void setRate(IOFSwitch sw, long rate) {
301 SwitchQueue queue = getQueue(sw);
302 if (queue == null) {
303 return;
304 }
305
306 if (rate > 0) {
307 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700308 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700309 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700310
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800311 /**
312 * Add OFMessage to the queue related to given switch.
313 * @param sw
314 * @param msg
315 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800316 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800317 FlowPusherProcess proc = getProcess(sw);
318 SwitchQueue queue = proc.queues.get(sw);
319
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800320 if (queue == null) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800321 queue = new SwitchQueue();
322 queue.state = QueueState.READY;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800323 synchronized (proc) {
324 proc.queues.put(sw, queue);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800325 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800326 }
327
328 synchronized (queue) {
329 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800330 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800331 }
332
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800333 proc.isMsgAdded = true;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800334
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800335 return true;
336 }
337
338 /**
339 * Create OFMessage from given flow information and add it to the queue.
340 * @param sw
341 * @param flowObj
342 * @param flowEntryObj
343 * @return
344 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800345 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800346 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800347 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
348 if (flowEntryIdStr == null)
349 return false;
350 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
351 String userState = flowEntryObj.getUserState();
352 if (userState == null)
353 return false;
354
355 //
356 // Create the Open Flow Flow Modification Entry to push
357 //
358 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
359 long cookie = flowEntryId.value();
360
361 short flowModCommand = OFFlowMod.OFPFC_ADD;
362 if (userState.equals("FE_USER_ADD")) {
363 flowModCommand = OFFlowMod.OFPFC_ADD;
364 } else if (userState.equals("FE_USER_MODIFY")) {
365 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
366 } else if (userState.equals("FE_USER_DELETE")) {
367 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
368 } else {
369 // Unknown user state. Ignore the entry
370 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
371 flowEntryId.toString(), userState);
372 return false;
373 }
374
375 //
376 // Fetch the match conditions.
377 //
378 // NOTE: The Flow matching conditions common for all Flow Entries are
379 // used ONLY if a Flow Entry does NOT have the corresponding matching
380 // condition set.
381 //
382 OFMatch match = new OFMatch();
383 match.setWildcards(OFMatch.OFPFW_ALL);
384
385 // Match the Incoming Port
386 Short matchInPort = flowEntryObj.getMatchInPort();
387 if (matchInPort != null) {
388 match.setInputPort(matchInPort);
389 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
390 }
391
392 // Match the Source MAC address
393 String matchSrcMac = flowEntryObj.getMatchSrcMac();
394 if (matchSrcMac == null)
395 matchSrcMac = flowObj.getMatchSrcMac();
396 if (matchSrcMac != null) {
397 match.setDataLayerSource(matchSrcMac);
398 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
399 }
400
401 // Match the Destination MAC address
402 String matchDstMac = flowEntryObj.getMatchDstMac();
403 if (matchDstMac == null)
404 matchDstMac = flowObj.getMatchDstMac();
405 if (matchDstMac != null) {
406 match.setDataLayerDestination(matchDstMac);
407 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
408 }
409
410 // Match the Ethernet Frame Type
411 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
412 if (matchEthernetFrameType == null)
413 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
414 if (matchEthernetFrameType != null) {
415 match.setDataLayerType(matchEthernetFrameType);
416 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
417 }
418
419 // Match the VLAN ID
420 Short matchVlanId = flowEntryObj.getMatchVlanId();
421 if (matchVlanId == null)
422 matchVlanId = flowObj.getMatchVlanId();
423 if (matchVlanId != null) {
424 match.setDataLayerVirtualLan(matchVlanId);
425 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
426 }
427
428 // Match the VLAN priority
429 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
430 if (matchVlanPriority == null)
431 matchVlanPriority = flowObj.getMatchVlanPriority();
432 if (matchVlanPriority != null) {
433 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
434 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
435 }
436
437 // Match the Source IPv4 Network prefix
438 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
439 if (matchSrcIPv4Net == null)
440 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
441 if (matchSrcIPv4Net != null) {
442 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
443 }
444
445 // Match the Destination IPv4 Network prefix
446 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
447 if (matchDstIPv4Net == null)
448 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
449 if (matchDstIPv4Net != null) {
450 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
451 }
452
453 // Match the IP protocol
454 Byte matchIpProto = flowEntryObj.getMatchIpProto();
455 if (matchIpProto == null)
456 matchIpProto = flowObj.getMatchIpProto();
457 if (matchIpProto != null) {
458 match.setNetworkProtocol(matchIpProto);
459 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
460 }
461
462 // Match the IP ToS (DSCP field, 6 bits)
463 Byte matchIpToS = flowEntryObj.getMatchIpToS();
464 if (matchIpToS == null)
465 matchIpToS = flowObj.getMatchIpToS();
466 if (matchIpToS != null) {
467 match.setNetworkTypeOfService(matchIpToS);
468 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
469 }
470
471 // Match the Source TCP/UDP port
472 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
473 if (matchSrcTcpUdpPort == null)
474 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
475 if (matchSrcTcpUdpPort != null) {
476 match.setTransportSource(matchSrcTcpUdpPort);
477 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
478 }
479
480 // Match the Destination TCP/UDP port
481 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
482 if (matchDstTcpUdpPort == null)
483 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
484 if (matchDstTcpUdpPort != null) {
485 match.setTransportDestination(matchDstTcpUdpPort);
486 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
487 }
488
489 //
490 // Fetch the actions
491 //
492 Short actionOutputPort = null;
493 List<OFAction> openFlowActions = new ArrayList<OFAction>();
494 int actionsLen = 0;
495 FlowEntryActions flowEntryActions = null;
496 String actionsStr = flowEntryObj.getActions();
497 if (actionsStr != null)
498 flowEntryActions = new FlowEntryActions(actionsStr);
499 else
500 flowEntryActions = new FlowEntryActions();
501 for (FlowEntryAction action : flowEntryActions.actions()) {
502 ActionOutput actionOutput = action.actionOutput();
503 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
504 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
505 ActionStripVlan actionStripVlan = action.actionStripVlan();
506 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
507 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
508 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
509 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
510 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
511 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
512 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
513 ActionEnqueue actionEnqueue = action.actionEnqueue();
514
515 if (actionOutput != null) {
516 actionOutputPort = actionOutput.port().value();
517 // XXX: The max length is hard-coded for now
518 OFActionOutput ofa =
519 new OFActionOutput(actionOutput.port().value(),
520 (short)0xffff);
521 openFlowActions.add(ofa);
522 actionsLen += ofa.getLength();
523 }
524
525 if (actionSetVlanId != null) {
526 OFActionVirtualLanIdentifier ofa =
527 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
528 openFlowActions.add(ofa);
529 actionsLen += ofa.getLength();
530 }
531
532 if (actionSetVlanPriority != null) {
533 OFActionVirtualLanPriorityCodePoint ofa =
534 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
535 openFlowActions.add(ofa);
536 actionsLen += ofa.getLength();
537 }
538
539 if (actionStripVlan != null) {
540 if (actionStripVlan.stripVlan() == true) {
541 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
542 openFlowActions.add(ofa);
543 actionsLen += ofa.getLength();
544 }
545 }
546
547 if (actionSetEthernetSrcAddr != null) {
548 OFActionDataLayerSource ofa =
549 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
550 openFlowActions.add(ofa);
551 actionsLen += ofa.getLength();
552 }
553
554 if (actionSetEthernetDstAddr != null) {
555 OFActionDataLayerDestination ofa =
556 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
557 openFlowActions.add(ofa);
558 actionsLen += ofa.getLength();
559 }
560
561 if (actionSetIPv4SrcAddr != null) {
562 OFActionNetworkLayerSource ofa =
563 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
564 openFlowActions.add(ofa);
565 actionsLen += ofa.getLength();
566 }
567
568 if (actionSetIPv4DstAddr != null) {
569 OFActionNetworkLayerDestination ofa =
570 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
571 openFlowActions.add(ofa);
572 actionsLen += ofa.getLength();
573 }
574
575 if (actionSetIpToS != null) {
576 OFActionNetworkTypeOfService ofa =
577 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
578 openFlowActions.add(ofa);
579 actionsLen += ofa.getLength();
580 }
581
582 if (actionSetTcpUdpSrcPort != null) {
583 OFActionTransportLayerSource ofa =
584 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
585 openFlowActions.add(ofa);
586 actionsLen += ofa.getLength();
587 }
588
589 if (actionSetTcpUdpDstPort != null) {
590 OFActionTransportLayerDestination ofa =
591 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
592 openFlowActions.add(ofa);
593 actionsLen += ofa.getLength();
594 }
595
596 if (actionEnqueue != null) {
597 OFActionEnqueue ofa =
598 new OFActionEnqueue(actionEnqueue.port().value(),
599 actionEnqueue.queueId());
600 openFlowActions.add(ofa);
601 actionsLen += ofa.getLength();
602 }
603 }
604
605 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
606 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
607 .setPriority(PRIORITY_DEFAULT)
608 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
609 .setCookie(cookie)
610 .setCommand(flowModCommand)
611 .setMatch(match)
612 .setActions(openFlowActions)
613 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
614 fm.setOutPort(OFPort.OFPP_NONE.getValue());
615 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
616 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
617 if (actionOutputPort != null)
618 fm.setOutPort(actionOutputPort);
619 }
620
621 //
622 // TODO: Set the following flag
623 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
624 // See method ForwardingBase::pushRoute()
625 //
626
627 //
628 // Write the message to the switch
629 //
630 log.debug("MEASUREMENT: Installing flow entry " + userState +
631 " into switch DPID: " +
632 sw.getStringId() +
633 " flowEntryId: " + flowEntryId.toString() +
634 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
635 " inPort: " + matchInPort + " outPort: " + actionOutputPort
636 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800637 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800638 //
639 // TODO: We should use the OpenFlow Barrier mechanism
640 // to check for errors, and update the SwitchState
641 // for a flow entry after the Barrier message is
642 // is received.
643 //
644 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
645
646 return true;
647 }
648
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800649 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
650 //
651 // Create the OpenFlow Flow Modification Entry to push
652 //
653 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
654 long cookie = flowEntry.flowEntryId().value();
655
656 short flowModCommand = OFFlowMod.OFPFC_ADD;
657 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
658 flowModCommand = OFFlowMod.OFPFC_ADD;
659 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
660 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
661 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
662 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
663 } else {
664 // Unknown user state. Ignore the entry
665 log.debug(
666 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
667 flowEntry.flowEntryId().toString(),
668 flowEntry.flowEntryUserState());
669 return false;
670 }
671
672 //
673 // Fetch the match conditions.
674 //
675 // NOTE: The Flow matching conditions common for all Flow Entries are
676 // used ONLY if a Flow Entry does NOT have the corresponding matching
677 // condition set.
678 //
679 OFMatch match = new OFMatch();
680 match.setWildcards(OFMatch.OFPFW_ALL);
681 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
682 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
683
684 // Match the Incoming Port
685 Port matchInPort = flowEntryMatch.inPort();
686 if (matchInPort != null) {
687 match.setInputPort(matchInPort.value());
688 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
689 }
690
691 // Match the Source MAC address
692 MACAddress matchSrcMac = flowEntryMatch.srcMac();
693 if ((matchSrcMac == null) && (flowPathMatch != null)) {
694 matchSrcMac = flowPathMatch.srcMac();
695 }
696 if (matchSrcMac != null) {
697 match.setDataLayerSource(matchSrcMac.toString());
698 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
699 }
700
701 // Match the Destination MAC address
702 MACAddress matchDstMac = flowEntryMatch.dstMac();
703 if ((matchDstMac == null) && (flowPathMatch != null)) {
704 matchDstMac = flowPathMatch.dstMac();
705 }
706 if (matchDstMac != null) {
707 match.setDataLayerDestination(matchDstMac.toString());
708 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
709 }
710
711 // Match the Ethernet Frame Type
712 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
713 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
714 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
715 }
716 if (matchEthernetFrameType != null) {
717 match.setDataLayerType(matchEthernetFrameType);
718 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
719 }
720
721 // Match the VLAN ID
722 Short matchVlanId = flowEntryMatch.vlanId();
723 if ((matchVlanId == null) && (flowPathMatch != null)) {
724 matchVlanId = flowPathMatch.vlanId();
725 }
726 if (matchVlanId != null) {
727 match.setDataLayerVirtualLan(matchVlanId);
728 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
729 }
730
731 // Match the VLAN priority
732 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
733 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
734 matchVlanPriority = flowPathMatch.vlanPriority();
735 }
736 if (matchVlanPriority != null) {
737 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
738 match.setWildcards(match.getWildcards()
739 & ~OFMatch.OFPFW_DL_VLAN_PCP);
740 }
741
742 // Match the Source IPv4 Network prefix
743 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
744 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
745 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
746 }
747 if (matchSrcIPv4Net != null) {
748 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
749 }
750
751 // Natch the Destination IPv4 Network prefix
752 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
753 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
754 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
755 }
756 if (matchDstIPv4Net != null) {
757 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
758 }
759
760 // Match the IP protocol
761 Byte matchIpProto = flowEntryMatch.ipProto();
762 if ((matchIpProto == null) && (flowPathMatch != null)) {
763 matchIpProto = flowPathMatch.ipProto();
764 }
765 if (matchIpProto != null) {
766 match.setNetworkProtocol(matchIpProto);
767 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
768 }
769
770 // Match the IP ToS (DSCP field, 6 bits)
771 Byte matchIpToS = flowEntryMatch.ipToS();
772 if ((matchIpToS == null) && (flowPathMatch != null)) {
773 matchIpToS = flowPathMatch.ipToS();
774 }
775 if (matchIpToS != null) {
776 match.setNetworkTypeOfService(matchIpToS);
777 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
778 }
779
780 // Match the Source TCP/UDP port
781 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
782 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
783 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
784 }
785 if (matchSrcTcpUdpPort != null) {
786 match.setTransportSource(matchSrcTcpUdpPort);
787 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
788 }
789
790 // Match the Destination TCP/UDP port
791 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
792 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
793 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
794 }
795 if (matchDstTcpUdpPort != null) {
796 match.setTransportDestination(matchDstTcpUdpPort);
797 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
798 }
799
800 //
801 // Fetch the actions
802 //
803 Short actionOutputPort = null;
804 List<OFAction> openFlowActions = new ArrayList<OFAction>();
805 int actionsLen = 0;
806 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
807 //
808 for (FlowEntryAction action : flowEntryActions.actions()) {
809 ActionOutput actionOutput = action.actionOutput();
810 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
811 ActionSetVlanPriority actionSetVlanPriority = action
812 .actionSetVlanPriority();
813 ActionStripVlan actionStripVlan = action.actionStripVlan();
814 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
815 .actionSetEthernetSrcAddr();
816 ActionSetEthernetAddr actionSetEthernetDstAddr = action
817 .actionSetEthernetDstAddr();
818 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
819 .actionSetIPv4SrcAddr();
820 ActionSetIPv4Addr actionSetIPv4DstAddr = action
821 .actionSetIPv4DstAddr();
822 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
823 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
824 .actionSetTcpUdpSrcPort();
825 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
826 .actionSetTcpUdpDstPort();
827 ActionEnqueue actionEnqueue = action.actionEnqueue();
828
829 if (actionOutput != null) {
830 actionOutputPort = actionOutput.port().value();
831 // XXX: The max length is hard-coded for now
832 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
833 .value(), (short) 0xffff);
834 openFlowActions.add(ofa);
835 actionsLen += ofa.getLength();
836 }
837
838 if (actionSetVlanId != null) {
839 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
840 actionSetVlanId.vlanId());
841 openFlowActions.add(ofa);
842 actionsLen += ofa.getLength();
843 }
844
845 if (actionSetVlanPriority != null) {
846 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
847 actionSetVlanPriority.vlanPriority());
848 openFlowActions.add(ofa);
849 actionsLen += ofa.getLength();
850 }
851
852 if (actionStripVlan != null) {
853 if (actionStripVlan.stripVlan() == true) {
854 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
855 openFlowActions.add(ofa);
856 actionsLen += ofa.getLength();
857 }
858 }
859
860 if (actionSetEthernetSrcAddr != null) {
861 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
862 actionSetEthernetSrcAddr.addr().toBytes());
863 openFlowActions.add(ofa);
864 actionsLen += ofa.getLength();
865 }
866
867 if (actionSetEthernetDstAddr != null) {
868 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
869 actionSetEthernetDstAddr.addr().toBytes());
870 openFlowActions.add(ofa);
871 actionsLen += ofa.getLength();
872 }
873
874 if (actionSetIPv4SrcAddr != null) {
875 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
876 actionSetIPv4SrcAddr.addr().value());
877 openFlowActions.add(ofa);
878 actionsLen += ofa.getLength();
879 }
880
881 if (actionSetIPv4DstAddr != null) {
882 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
883 actionSetIPv4DstAddr.addr().value());
884 openFlowActions.add(ofa);
885 actionsLen += ofa.getLength();
886 }
887
888 if (actionSetIpToS != null) {
889 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
890 actionSetIpToS.ipToS());
891 openFlowActions.add(ofa);
892 actionsLen += ofa.getLength();
893 }
894
895 if (actionSetTcpUdpSrcPort != null) {
896 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
897 actionSetTcpUdpSrcPort.port());
898 openFlowActions.add(ofa);
899 actionsLen += ofa.getLength();
900 }
901
902 if (actionSetTcpUdpDstPort != null) {
903 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
904 actionSetTcpUdpDstPort.port());
905 openFlowActions.add(ofa);
906 actionsLen += ofa.getLength();
907 }
908
909 if (actionEnqueue != null) {
910 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
911 .value(), actionEnqueue.queueId());
912 openFlowActions.add(ofa);
913 actionsLen += ofa.getLength();
914 }
915 }
916
917 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
918 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
919 .setPriority(PRIORITY_DEFAULT)
920 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
921 .setCommand(flowModCommand).setMatch(match)
922 .setActions(openFlowActions)
923 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
924 fm.setOutPort(OFPort.OFPP_NONE.getValue());
925 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
926 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
927 if (actionOutputPort != null)
928 fm.setOutPort(actionOutputPort);
929 }
930
931 //
932 // TODO: Set the following flag
933 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
934 // See method ForwardingBase::pushRoute()
935 //
936
937 //
938 // Write the message to the switch
939 //
940 log.debug("MEASUREMENT: Installing flow entry "
941 + flowEntry.flowEntryUserState() + " into switch DPID: "
942 + sw.getStringId() + " flowEntryId: "
943 + flowEntry.flowEntryId().toString() + " srcMac: "
944 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
945 + matchInPort + " outPort: " + actionOutputPort);
946
947 //
948 // TODO: We should use the OpenFlow Barrier mechanism
949 // to check for errors, and update the SwitchState
950 // for a flow entry after the Barrier message is
951 // is received.
952 //
953 // TODO: The FlowEntry Object in Titan should be set
954 // to FE_SWITCH_UPDATED.
955 //
956
957 return add(sw,fm);
958 }
959
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800960 private SwitchQueue getQueue(IOFSwitch sw) {
961 if (sw == null) {
962 return null;
963 }
964
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800965 return getProcess(sw).queues.get(sw);
966 }
967
968 private long getHash(IOFSwitch sw) {
969 // TODO should consider equalization algorithm
970 return sw.getId() % number_thread;
971 }
972
973 private FlowPusherProcess getProcess(IOFSwitch sw) {
974 long hash = getHash(sw);
975
976 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800977 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700978}