blob: 6c7a2bb7a7e6b31fe1c95b40c68ade56e4794c30 [file] [log] [blame]
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001package net.onrc.onos.ofcontroller.flowmanager;
2
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08008import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009import java.util.Map;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070010
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import org.openflow.protocol.*;
12import org.openflow.protocol.action.*;
13import org.openflow.protocol.factory.BasicFactory;
14import org.slf4j.Logger;
15import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070016
17import net.floodlightcontroller.core.FloodlightContext;
18import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080019import net.floodlightcontroller.util.OFMessageDamper;
20import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
21import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
22import net.onrc.onos.ofcontroller.util.FlowEntryAction;
23import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
24import net.onrc.onos.ofcontroller.util.FlowEntryActions;
25import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070026
27/**
28 * FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
29 * FlowPusher controls the rate of sending flow_mods so that connection doesn't overflow.
30 * @author Naoki Shiota
31 *
32 */
33public class FlowPusher {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080034 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
35
36 // NOTE: Below are moved from FlowManager.
37 // TODO: Values copied from elsewhere (class LearningSwitch).
38 // The local copy should go away!
39 //
40 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
41 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
42
43 public static final short PRIORITY_DEFAULT = 100;
44 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
45 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
46
47 public enum QueueState {
48 READY,
49 SUSPENDED,
50 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070051
Naoki Shiota7d0cf272013-11-05 10:18:12 -080052 private class SwitchQueue extends ArrayDeque<OFMessage> {
53 QueueState state;
54
55 // Max rate of sending message (bytes/sec). 0 implies no limitation.
56 long max_rate = 0;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070057 long last_sent_time = 0;
58 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080059
60 /**
61 * Check if sending rate is within the rate
62 * @param current Current time
63 * @return true if within the rate
64 */
65 boolean isSendable(long current) {
66 long rate = last_sent_size / (current - last_sent_time);
67
68 if (rate < max_rate) {
69 return true;
70 } else {
71 return false;
72 }
73 }
74
75 void updateRate(long current, OFMessage msg) {
76 last_sent_time = current;
77 last_sent_size = msg.getLengthU();
78 }
79
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070080 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -080081
82 private Map<IOFSwitch,SwitchQueue> queues
83 = new HashMap<IOFSwitch,SwitchQueue>();
84
85 private OFMessageDamper messageDamper;
Naoki Shiotacf1acca2013-10-31 11:40:32 -070086
Naoki Shiota7d0cf272013-11-05 10:18:12 -080087 private FloodlightContext context = null;
88 private BasicFactory factory = null;
89 private Thread thread = null;
90
91
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070092 private class FlowPusherProcess implements Runnable {
93 @Override
94 public void run() {
Naoki Shiotac2a699a2013-10-31 15:36:01 -070095
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070096 while (true) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080097 for (Map.Entry<IOFSwitch,SwitchQueue> entry : queues.entrySet()) {
98 IOFSwitch sw = entry.getKey();
99 SwitchQueue queue = entry.getValue();
100
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700101 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800102 if (sw == null || queue == null ||
103 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700104 continue;
105 }
106
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800107 synchronized (queue) {
108 // TODO send multiple messages at once
109
110 while (! queue.isEmpty()) {
111 OFMessage msg = queue.poll();
112 // check sending rate and determine it to be sent or not
113 long current_time = System.nanoTime();
114
115 // if need to send, call IOFSwitch#write()
116 if (queue.isSendable(current_time)) {
117 try {
118 messageDamper.write(sw, msg, context);
119 queue.updateRate(current_time, msg);
120 } catch (IOException e) {
121 // TODO Auto-generated catch block
122 e.printStackTrace();
123 }
124 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700125 }
126 }
127 }
128
129 // sleep while all queues are empty
130 boolean sleep = true;
131 do {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800132
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700133 // TODO check if queues are empty
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700134 } while (sleep);
135 }
136 }
137 }
138
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800139 public void init(FloodlightContext context, BasicFactory factory) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700140 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800141 this.factory = factory;
142 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
143 EnumSet.of(OFType.FLOW_MOD),
144 OFMESSAGE_DAMPER_TIMEOUT);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700145 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800146 /**
147 * Begin processing queue.
148 */
149 public void start() {
150 if (context == null || factory == null) {
151 // not yet initialized
152 return;
153 }
154
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700155 thread = new Thread(new FlowPusherProcess());
156 thread.start();
157 }
158
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800159 /**
160 * Suspend processing a queue related to given switch.
161 * @param sw
162 */
163 public void suspend(IOFSwitch sw) {
164 SwitchQueue queue = getQueue(sw);
165
166 if (queue == null) {
167 return;
168 }
169
170 synchronized (queue) {
171 if (queue.state == QueueState.READY) {
172 queue.state = QueueState.SUSPENDED;
173 }
174 }
175 }
176
177 /**
178 * Resume processing a queue related to given switch.
179 */
180 public void resume(IOFSwitch sw) {
181 SwitchQueue queue = getQueue(sw);
182
183 if (queue == null) {
184 return;
185 }
186
187 synchronized (queue) {
188 if (queue.state == QueueState.SUSPENDED) {
189 queue.state = QueueState.READY;
190 }
191 }
192 }
193
194 /**
195 * End processing queue and exit thread.
196 */
197 public void stop() {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700198 if (thread != null && thread.isAlive()) {
199 // TODO tell thread to halt
200 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700201 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700202
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800203 /**
204 * Add OFMessage to the queue related to given switch.
205 * @param sw
206 * @param msg
207 */
208 public boolean send(IOFSwitch sw, OFMessage msg) {
209 SwitchQueue queue = getQueue(sw);
210 if (queue == null) {
211 queues.put(sw, new SwitchQueue());
212 }
213
214 synchronized (queue) {
215 queue.add(msg);
216 }
217
218 return true;
219 }
220
221 /**
222 * Create OFMessage from given flow information and add it to the queue.
223 * @param sw
224 * @param flowObj
225 * @param flowEntryObj
226 * @return
227 */
228 public boolean send(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
229 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
230 if (flowEntryIdStr == null)
231 return false;
232 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
233 String userState = flowEntryObj.getUserState();
234 if (userState == null)
235 return false;
236
237 //
238 // Create the Open Flow Flow Modification Entry to push
239 //
240 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
241 long cookie = flowEntryId.value();
242
243 short flowModCommand = OFFlowMod.OFPFC_ADD;
244 if (userState.equals("FE_USER_ADD")) {
245 flowModCommand = OFFlowMod.OFPFC_ADD;
246 } else if (userState.equals("FE_USER_MODIFY")) {
247 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
248 } else if (userState.equals("FE_USER_DELETE")) {
249 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
250 } else {
251 // Unknown user state. Ignore the entry
252 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
253 flowEntryId.toString(), userState);
254 return false;
255 }
256
257 //
258 // Fetch the match conditions.
259 //
260 // NOTE: The Flow matching conditions common for all Flow Entries are
261 // used ONLY if a Flow Entry does NOT have the corresponding matching
262 // condition set.
263 //
264 OFMatch match = new OFMatch();
265 match.setWildcards(OFMatch.OFPFW_ALL);
266
267 // Match the Incoming Port
268 Short matchInPort = flowEntryObj.getMatchInPort();
269 if (matchInPort != null) {
270 match.setInputPort(matchInPort);
271 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
272 }
273
274 // Match the Source MAC address
275 String matchSrcMac = flowEntryObj.getMatchSrcMac();
276 if (matchSrcMac == null)
277 matchSrcMac = flowObj.getMatchSrcMac();
278 if (matchSrcMac != null) {
279 match.setDataLayerSource(matchSrcMac);
280 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
281 }
282
283 // Match the Destination MAC address
284 String matchDstMac = flowEntryObj.getMatchDstMac();
285 if (matchDstMac == null)
286 matchDstMac = flowObj.getMatchDstMac();
287 if (matchDstMac != null) {
288 match.setDataLayerDestination(matchDstMac);
289 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
290 }
291
292 // Match the Ethernet Frame Type
293 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
294 if (matchEthernetFrameType == null)
295 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
296 if (matchEthernetFrameType != null) {
297 match.setDataLayerType(matchEthernetFrameType);
298 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
299 }
300
301 // Match the VLAN ID
302 Short matchVlanId = flowEntryObj.getMatchVlanId();
303 if (matchVlanId == null)
304 matchVlanId = flowObj.getMatchVlanId();
305 if (matchVlanId != null) {
306 match.setDataLayerVirtualLan(matchVlanId);
307 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
308 }
309
310 // Match the VLAN priority
311 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
312 if (matchVlanPriority == null)
313 matchVlanPriority = flowObj.getMatchVlanPriority();
314 if (matchVlanPriority != null) {
315 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
316 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
317 }
318
319 // Match the Source IPv4 Network prefix
320 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
321 if (matchSrcIPv4Net == null)
322 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
323 if (matchSrcIPv4Net != null) {
324 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
325 }
326
327 // Match the Destination IPv4 Network prefix
328 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
329 if (matchDstIPv4Net == null)
330 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
331 if (matchDstIPv4Net != null) {
332 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
333 }
334
335 // Match the IP protocol
336 Byte matchIpProto = flowEntryObj.getMatchIpProto();
337 if (matchIpProto == null)
338 matchIpProto = flowObj.getMatchIpProto();
339 if (matchIpProto != null) {
340 match.setNetworkProtocol(matchIpProto);
341 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
342 }
343
344 // Match the IP ToS (DSCP field, 6 bits)
345 Byte matchIpToS = flowEntryObj.getMatchIpToS();
346 if (matchIpToS == null)
347 matchIpToS = flowObj.getMatchIpToS();
348 if (matchIpToS != null) {
349 match.setNetworkTypeOfService(matchIpToS);
350 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
351 }
352
353 // Match the Source TCP/UDP port
354 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
355 if (matchSrcTcpUdpPort == null)
356 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
357 if (matchSrcTcpUdpPort != null) {
358 match.setTransportSource(matchSrcTcpUdpPort);
359 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
360 }
361
362 // Match the Destination TCP/UDP port
363 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
364 if (matchDstTcpUdpPort == null)
365 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
366 if (matchDstTcpUdpPort != null) {
367 match.setTransportDestination(matchDstTcpUdpPort);
368 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
369 }
370
371 //
372 // Fetch the actions
373 //
374 Short actionOutputPort = null;
375 List<OFAction> openFlowActions = new ArrayList<OFAction>();
376 int actionsLen = 0;
377 FlowEntryActions flowEntryActions = null;
378 String actionsStr = flowEntryObj.getActions();
379 if (actionsStr != null)
380 flowEntryActions = new FlowEntryActions(actionsStr);
381 else
382 flowEntryActions = new FlowEntryActions();
383 for (FlowEntryAction action : flowEntryActions.actions()) {
384 ActionOutput actionOutput = action.actionOutput();
385 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
386 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
387 ActionStripVlan actionStripVlan = action.actionStripVlan();
388 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
389 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
390 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
391 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
392 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
393 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
394 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
395 ActionEnqueue actionEnqueue = action.actionEnqueue();
396
397 if (actionOutput != null) {
398 actionOutputPort = actionOutput.port().value();
399 // XXX: The max length is hard-coded for now
400 OFActionOutput ofa =
401 new OFActionOutput(actionOutput.port().value(),
402 (short)0xffff);
403 openFlowActions.add(ofa);
404 actionsLen += ofa.getLength();
405 }
406
407 if (actionSetVlanId != null) {
408 OFActionVirtualLanIdentifier ofa =
409 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
410 openFlowActions.add(ofa);
411 actionsLen += ofa.getLength();
412 }
413
414 if (actionSetVlanPriority != null) {
415 OFActionVirtualLanPriorityCodePoint ofa =
416 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
417 openFlowActions.add(ofa);
418 actionsLen += ofa.getLength();
419 }
420
421 if (actionStripVlan != null) {
422 if (actionStripVlan.stripVlan() == true) {
423 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
424 openFlowActions.add(ofa);
425 actionsLen += ofa.getLength();
426 }
427 }
428
429 if (actionSetEthernetSrcAddr != null) {
430 OFActionDataLayerSource ofa =
431 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
432 openFlowActions.add(ofa);
433 actionsLen += ofa.getLength();
434 }
435
436 if (actionSetEthernetDstAddr != null) {
437 OFActionDataLayerDestination ofa =
438 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
439 openFlowActions.add(ofa);
440 actionsLen += ofa.getLength();
441 }
442
443 if (actionSetIPv4SrcAddr != null) {
444 OFActionNetworkLayerSource ofa =
445 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
446 openFlowActions.add(ofa);
447 actionsLen += ofa.getLength();
448 }
449
450 if (actionSetIPv4DstAddr != null) {
451 OFActionNetworkLayerDestination ofa =
452 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
453 openFlowActions.add(ofa);
454 actionsLen += ofa.getLength();
455 }
456
457 if (actionSetIpToS != null) {
458 OFActionNetworkTypeOfService ofa =
459 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
460 openFlowActions.add(ofa);
461 actionsLen += ofa.getLength();
462 }
463
464 if (actionSetTcpUdpSrcPort != null) {
465 OFActionTransportLayerSource ofa =
466 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
467 openFlowActions.add(ofa);
468 actionsLen += ofa.getLength();
469 }
470
471 if (actionSetTcpUdpDstPort != null) {
472 OFActionTransportLayerDestination ofa =
473 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
474 openFlowActions.add(ofa);
475 actionsLen += ofa.getLength();
476 }
477
478 if (actionEnqueue != null) {
479 OFActionEnqueue ofa =
480 new OFActionEnqueue(actionEnqueue.port().value(),
481 actionEnqueue.queueId());
482 openFlowActions.add(ofa);
483 actionsLen += ofa.getLength();
484 }
485 }
486
487 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
488 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
489 .setPriority(PRIORITY_DEFAULT)
490 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
491 .setCookie(cookie)
492 .setCommand(flowModCommand)
493 .setMatch(match)
494 .setActions(openFlowActions)
495 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
496 fm.setOutPort(OFPort.OFPP_NONE.getValue());
497 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
498 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
499 if (actionOutputPort != null)
500 fm.setOutPort(actionOutputPort);
501 }
502
503 //
504 // TODO: Set the following flag
505 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
506 // See method ForwardingBase::pushRoute()
507 //
508
509 //
510 // Write the message to the switch
511 //
512 log.debug("MEASUREMENT: Installing flow entry " + userState +
513 " into switch DPID: " +
514 sw.getStringId() +
515 " flowEntryId: " + flowEntryId.toString() +
516 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
517 " inPort: " + matchInPort + " outPort: " + actionOutputPort
518 );
519 send(sw,fm);
520 //
521 // TODO: We should use the OpenFlow Barrier mechanism
522 // to check for errors, and update the SwitchState
523 // for a flow entry after the Barrier message is
524 // is received.
525 //
526 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
527
528 return true;
529 }
530
531 private SwitchQueue getQueue(IOFSwitch sw) {
532 if (sw == null) {
533 return null;
534 }
535
536 return queues.get(sw);
537 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700538}