blob: cb47f0b666c0d9caf02f629fa3fbc1cc00dc8cde [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
6import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08008import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080010import java.util.Set;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070011
Naoki Shiota7d0cf272013-11-05 10:18:12 -080012import org.openflow.protocol.*;
13import org.openflow.protocol.action.*;
14import org.openflow.protocol.factory.BasicFactory;
15import org.slf4j.Logger;
16import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070017
18import net.floodlightcontroller.core.FloodlightContext;
19import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080020import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080021import net.floodlightcontroller.util.OFMessageDamper;
22import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowEntry;
23import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IFlowPath;
24import net.onrc.onos.ofcontroller.util.FlowEntryAction;
25import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080026import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080027import net.onrc.onos.ofcontroller.util.FlowEntryActions;
28import net.onrc.onos.ofcontroller.util.FlowEntryId;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080029import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
30import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
31import net.onrc.onos.ofcontroller.util.FlowPath;
32import net.onrc.onos.ofcontroller.util.IPv4Net;
33import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070034
35/**
36 * FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
37 * FlowPusher controls the rate of sending flow_mods so that connection doesn't overflow.
38 * @author Naoki Shiota
39 *
40 */
41public class FlowPusher {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080042 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
43
44 // NOTE: Below are moved from FlowManager.
45 // TODO: Values copied from elsewhere (class LearningSwitch).
46 // The local copy should go away!
47 //
48 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
49 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080050
51 protected static final long SLEEP_MILLI_SEC = 3;
52 protected static final int SLEEP_NANO_SEC = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080053
54 public static final short PRIORITY_DEFAULT = 100;
55 public static final short FLOWMOD_DEFAULT_IDLE_TIMEOUT = 0; // infinity
56 public static final short FLOWMOD_DEFAULT_HARD_TIMEOUT = 0; // infinite
57
58 public enum QueueState {
59 READY,
60 SUSPENDED,
61 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070062
Naoki Shiota7d0cf272013-11-05 10:18:12 -080063 private class SwitchQueue extends ArrayDeque<OFMessage> {
64 QueueState state;
65
66 // Max rate of sending message (bytes/sec). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080067 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070068 long last_sent_time = 0;
69 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080070
71 /**
72 * Check if sending rate is within the rate
73 * @param current Current time
74 * @return true if within the rate
75 */
76 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -080077 if (max_rate == 0) {
78 // no limitation
79 return true;
80 }
81
Naoki Shiota7d0cf272013-11-05 10:18:12 -080082 long rate = last_sent_size / (current - last_sent_time);
83
84 if (rate < max_rate) {
85 return true;
86 } else {
87 return false;
88 }
89 }
90
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080091 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080092 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080093 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080094 }
95
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070096 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -080097
Naoki Shiota7d0cf272013-11-05 10:18:12 -080098 private OFMessageDamper messageDamper;
Naoki Shiotacf1acca2013-10-31 11:40:32 -070099
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800100 private FloodlightContext context = null;
101 private BasicFactory factory = null;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800102 private Map<Long, FlowPusherProcess> threadMap = null;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800103
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800104 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800105
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700106 private class FlowPusherProcess implements Runnable {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800107 private Map<IOFSwitch,SwitchQueue> queues
108 = new HashMap<IOFSwitch,SwitchQueue>();
109
110 private boolean isStopped = false;
111 private boolean isMsgAdded = false;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800112
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700113 @Override
114 public void run() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800115 log.debug("Begin Flow Pusher Process");
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700116
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700117 while (true) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800118 Set< Map.Entry<IOFSwitch,SwitchQueue> > entries;
119 synchronized (queues) {
120 entries = queues.entrySet();
121 }
122
123 // Set taint flag to false at this moment.
124 isMsgAdded = false;
125
126 for (Map.Entry<IOFSwitch,SwitchQueue> entry : entries) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800127 IOFSwitch sw = entry.getKey();
128 SwitchQueue queue = entry.getValue();
129
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700130 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800131 if (sw == null || queue == null ||
132 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700133 continue;
134 }
135
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800136 // check sending rate and determine it to be sent or not
137 long current_time = System.nanoTime();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800138 long size = 0;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800139
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800140 synchronized (queue) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800141 if (queue.isSendable(current_time)) {
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800142 // TODO limit number of messages to be sent at once
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800143 while (! queue.isEmpty()) {
144 OFMessage msg = queue.poll();
145
146 // if need to send, call IOFSwitch#write()
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800147 try {
148 messageDamper.write(sw, msg, context);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800149 log.debug("Pusher sends message : {}", msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800150 size += msg.getLength();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800151 } catch (IOException e) {
152 // TODO Auto-generated catch block
153 e.printStackTrace();
154 }
155 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800156 sw.flush();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800157 queue.logSentData(current_time, size);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700158 }
159 }
160 }
161
162 // sleep while all queues are empty
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800163 while (! (isMsgAdded || isStopped)) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800164 try {
165 Thread.sleep(SLEEP_MILLI_SEC, SLEEP_NANO_SEC);
166 } catch (InterruptedException e) {
167 // TODO Auto-generated catch block
168 e.printStackTrace();
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800169 log.error("Thread.sleep failed");
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800170 }
171 }
172
173 log.debug("Exit sleep loop.");
174
175 if (isStopped) {
176 log.debug("Pusher Process finished.");
177 return;
178 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800179
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700180 }
181 }
182 }
183
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800184 public FlowPusher() {
185
186 }
187
188 public FlowPusher(int number_thread) {
189 this.number_thread = number_thread;
190 }
191
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800192 public void init(FloodlightContext context, BasicFactory factory, OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700193 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800194 this.factory = factory;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800195
196 if (damper != null) {
197 messageDamper = damper;
198 } else {
199 // use default value
200 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
201 EnumSet.of(OFType.FLOW_MOD),
202 OFMESSAGE_DAMPER_TIMEOUT);
203 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700204 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800205
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800206 /**
207 * Begin processing queue.
208 */
209 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800210 if (factory == null) {
211 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800212 return;
213 }
214
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800215 threadMap = new HashMap<Long,FlowPusherProcess>();
216 for (long i = 0; i < number_thread; ++i) {
217 FlowPusherProcess runnable = new FlowPusherProcess();
218 threadMap.put(i, runnable);
219
220 Thread thread = new Thread(runnable);
221 thread.start();
222 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700223 }
224
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800225 /**
226 * Suspend processing a queue related to given switch.
227 * @param sw
228 */
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800229 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800230 SwitchQueue queue = getQueue(sw);
231
232 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800233 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800234 }
235
236 synchronized (queue) {
237 if (queue.state == QueueState.READY) {
238 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800239 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800240 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800241 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800242 }
243 }
244
245 /**
246 * Resume processing a queue related to given switch.
247 */
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800248 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800249 SwitchQueue queue = getQueue(sw);
250
251 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800252 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800253 }
254
255 synchronized (queue) {
256 if (queue.state == QueueState.SUSPENDED) {
257 queue.state = QueueState.READY;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800258 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800259 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800260 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800261 }
262 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800263
264 public boolean isSuspended(IOFSwitch sw) {
265 SwitchQueue queue = getQueue(sw);
266
267 if (queue == null) {
268 // TODO Is true suitable for this case?
269 return true;
270 }
271
272 return (queue.state == QueueState.SUSPENDED);
273 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800274
275 /**
276 * End processing queue and exit thread.
277 */
278 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800279 if (threadMap == null) {
280 return;
281 }
282
283 for (FlowPusherProcess runnable : threadMap.values()) {
284 if (! runnable.isStopped) {
285 runnable.isStopped = true;
286 }
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800287 }
288 }
289
290 public void setRate(IOFSwitch sw, long rate) {
291 SwitchQueue queue = getQueue(sw);
292 if (queue == null) {
293 return;
294 }
295
296 if (rate > 0) {
297 queue.max_rate = rate;
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700298 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700299 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700300
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800301 /**
302 * Add OFMessage to the queue related to given switch.
303 * @param sw
304 * @param msg
305 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800306 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800307 FlowPusherProcess proc = getProcess(sw);
308 SwitchQueue queue = proc.queues.get(sw);
309
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800310 if (queue == null) {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800311 queue = new SwitchQueue();
312 queue.state = QueueState.READY;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800313 synchronized (proc) {
314 proc.queues.put(sw, queue);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800315 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800316 }
317
318 synchronized (queue) {
319 queue.add(msg);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800320 log.debug("Message is pushed : {}", msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800321 }
322
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800323 proc.isMsgAdded = true;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800324
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800325 return true;
326 }
327
328 /**
329 * Create OFMessage from given flow information and add it to the queue.
330 * @param sw
331 * @param flowObj
332 * @param flowEntryObj
333 * @return
334 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800335 public boolean add(IOFSwitch sw, IFlowPath flowObj, IFlowEntry flowEntryObj) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800336 log.debug("sending : {}, {}", sw, flowObj);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800337 String flowEntryIdStr = flowEntryObj.getFlowEntryId();
338 if (flowEntryIdStr == null)
339 return false;
340 FlowEntryId flowEntryId = new FlowEntryId(flowEntryIdStr);
341 String userState = flowEntryObj.getUserState();
342 if (userState == null)
343 return false;
344
345 //
346 // Create the Open Flow Flow Modification Entry to push
347 //
348 OFFlowMod fm = (OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
349 long cookie = flowEntryId.value();
350
351 short flowModCommand = OFFlowMod.OFPFC_ADD;
352 if (userState.equals("FE_USER_ADD")) {
353 flowModCommand = OFFlowMod.OFPFC_ADD;
354 } else if (userState.equals("FE_USER_MODIFY")) {
355 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
356 } else if (userState.equals("FE_USER_DELETE")) {
357 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
358 } else {
359 // Unknown user state. Ignore the entry
360 log.debug("Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
361 flowEntryId.toString(), userState);
362 return false;
363 }
364
365 //
366 // Fetch the match conditions.
367 //
368 // NOTE: The Flow matching conditions common for all Flow Entries are
369 // used ONLY if a Flow Entry does NOT have the corresponding matching
370 // condition set.
371 //
372 OFMatch match = new OFMatch();
373 match.setWildcards(OFMatch.OFPFW_ALL);
374
375 // Match the Incoming Port
376 Short matchInPort = flowEntryObj.getMatchInPort();
377 if (matchInPort != null) {
378 match.setInputPort(matchInPort);
379 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
380 }
381
382 // Match the Source MAC address
383 String matchSrcMac = flowEntryObj.getMatchSrcMac();
384 if (matchSrcMac == null)
385 matchSrcMac = flowObj.getMatchSrcMac();
386 if (matchSrcMac != null) {
387 match.setDataLayerSource(matchSrcMac);
388 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
389 }
390
391 // Match the Destination MAC address
392 String matchDstMac = flowEntryObj.getMatchDstMac();
393 if (matchDstMac == null)
394 matchDstMac = flowObj.getMatchDstMac();
395 if (matchDstMac != null) {
396 match.setDataLayerDestination(matchDstMac);
397 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
398 }
399
400 // Match the Ethernet Frame Type
401 Short matchEthernetFrameType = flowEntryObj.getMatchEthernetFrameType();
402 if (matchEthernetFrameType == null)
403 matchEthernetFrameType = flowObj.getMatchEthernetFrameType();
404 if (matchEthernetFrameType != null) {
405 match.setDataLayerType(matchEthernetFrameType);
406 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
407 }
408
409 // Match the VLAN ID
410 Short matchVlanId = flowEntryObj.getMatchVlanId();
411 if (matchVlanId == null)
412 matchVlanId = flowObj.getMatchVlanId();
413 if (matchVlanId != null) {
414 match.setDataLayerVirtualLan(matchVlanId);
415 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
416 }
417
418 // Match the VLAN priority
419 Byte matchVlanPriority = flowEntryObj.getMatchVlanPriority();
420 if (matchVlanPriority == null)
421 matchVlanPriority = flowObj.getMatchVlanPriority();
422 if (matchVlanPriority != null) {
423 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
424 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN_PCP);
425 }
426
427 // Match the Source IPv4 Network prefix
428 String matchSrcIPv4Net = flowEntryObj.getMatchSrcIPv4Net();
429 if (matchSrcIPv4Net == null)
430 matchSrcIPv4Net = flowObj.getMatchSrcIPv4Net();
431 if (matchSrcIPv4Net != null) {
432 match.setFromCIDR(matchSrcIPv4Net, OFMatch.STR_NW_SRC);
433 }
434
435 // Match the Destination IPv4 Network prefix
436 String matchDstIPv4Net = flowEntryObj.getMatchDstIPv4Net();
437 if (matchDstIPv4Net == null)
438 matchDstIPv4Net = flowObj.getMatchDstIPv4Net();
439 if (matchDstIPv4Net != null) {
440 match.setFromCIDR(matchDstIPv4Net, OFMatch.STR_NW_DST);
441 }
442
443 // Match the IP protocol
444 Byte matchIpProto = flowEntryObj.getMatchIpProto();
445 if (matchIpProto == null)
446 matchIpProto = flowObj.getMatchIpProto();
447 if (matchIpProto != null) {
448 match.setNetworkProtocol(matchIpProto);
449 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
450 }
451
452 // Match the IP ToS (DSCP field, 6 bits)
453 Byte matchIpToS = flowEntryObj.getMatchIpToS();
454 if (matchIpToS == null)
455 matchIpToS = flowObj.getMatchIpToS();
456 if (matchIpToS != null) {
457 match.setNetworkTypeOfService(matchIpToS);
458 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
459 }
460
461 // Match the Source TCP/UDP port
462 Short matchSrcTcpUdpPort = flowEntryObj.getMatchSrcTcpUdpPort();
463 if (matchSrcTcpUdpPort == null)
464 matchSrcTcpUdpPort = flowObj.getMatchSrcTcpUdpPort();
465 if (matchSrcTcpUdpPort != null) {
466 match.setTransportSource(matchSrcTcpUdpPort);
467 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
468 }
469
470 // Match the Destination TCP/UDP port
471 Short matchDstTcpUdpPort = flowEntryObj.getMatchDstTcpUdpPort();
472 if (matchDstTcpUdpPort == null)
473 matchDstTcpUdpPort = flowObj.getMatchDstTcpUdpPort();
474 if (matchDstTcpUdpPort != null) {
475 match.setTransportDestination(matchDstTcpUdpPort);
476 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
477 }
478
479 //
480 // Fetch the actions
481 //
482 Short actionOutputPort = null;
483 List<OFAction> openFlowActions = new ArrayList<OFAction>();
484 int actionsLen = 0;
485 FlowEntryActions flowEntryActions = null;
486 String actionsStr = flowEntryObj.getActions();
487 if (actionsStr != null)
488 flowEntryActions = new FlowEntryActions(actionsStr);
489 else
490 flowEntryActions = new FlowEntryActions();
491 for (FlowEntryAction action : flowEntryActions.actions()) {
492 ActionOutput actionOutput = action.actionOutput();
493 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
494 ActionSetVlanPriority actionSetVlanPriority = action.actionSetVlanPriority();
495 ActionStripVlan actionStripVlan = action.actionStripVlan();
496 ActionSetEthernetAddr actionSetEthernetSrcAddr = action.actionSetEthernetSrcAddr();
497 ActionSetEthernetAddr actionSetEthernetDstAddr = action.actionSetEthernetDstAddr();
498 ActionSetIPv4Addr actionSetIPv4SrcAddr = action.actionSetIPv4SrcAddr();
499 ActionSetIPv4Addr actionSetIPv4DstAddr = action.actionSetIPv4DstAddr();
500 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
501 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action.actionSetTcpUdpSrcPort();
502 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action.actionSetTcpUdpDstPort();
503 ActionEnqueue actionEnqueue = action.actionEnqueue();
504
505 if (actionOutput != null) {
506 actionOutputPort = actionOutput.port().value();
507 // XXX: The max length is hard-coded for now
508 OFActionOutput ofa =
509 new OFActionOutput(actionOutput.port().value(),
510 (short)0xffff);
511 openFlowActions.add(ofa);
512 actionsLen += ofa.getLength();
513 }
514
515 if (actionSetVlanId != null) {
516 OFActionVirtualLanIdentifier ofa =
517 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
518 openFlowActions.add(ofa);
519 actionsLen += ofa.getLength();
520 }
521
522 if (actionSetVlanPriority != null) {
523 OFActionVirtualLanPriorityCodePoint ofa =
524 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
525 openFlowActions.add(ofa);
526 actionsLen += ofa.getLength();
527 }
528
529 if (actionStripVlan != null) {
530 if (actionStripVlan.stripVlan() == true) {
531 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
532 openFlowActions.add(ofa);
533 actionsLen += ofa.getLength();
534 }
535 }
536
537 if (actionSetEthernetSrcAddr != null) {
538 OFActionDataLayerSource ofa =
539 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
540 openFlowActions.add(ofa);
541 actionsLen += ofa.getLength();
542 }
543
544 if (actionSetEthernetDstAddr != null) {
545 OFActionDataLayerDestination ofa =
546 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
547 openFlowActions.add(ofa);
548 actionsLen += ofa.getLength();
549 }
550
551 if (actionSetIPv4SrcAddr != null) {
552 OFActionNetworkLayerSource ofa =
553 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
554 openFlowActions.add(ofa);
555 actionsLen += ofa.getLength();
556 }
557
558 if (actionSetIPv4DstAddr != null) {
559 OFActionNetworkLayerDestination ofa =
560 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
561 openFlowActions.add(ofa);
562 actionsLen += ofa.getLength();
563 }
564
565 if (actionSetIpToS != null) {
566 OFActionNetworkTypeOfService ofa =
567 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
568 openFlowActions.add(ofa);
569 actionsLen += ofa.getLength();
570 }
571
572 if (actionSetTcpUdpSrcPort != null) {
573 OFActionTransportLayerSource ofa =
574 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
575 openFlowActions.add(ofa);
576 actionsLen += ofa.getLength();
577 }
578
579 if (actionSetTcpUdpDstPort != null) {
580 OFActionTransportLayerDestination ofa =
581 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
582 openFlowActions.add(ofa);
583 actionsLen += ofa.getLength();
584 }
585
586 if (actionEnqueue != null) {
587 OFActionEnqueue ofa =
588 new OFActionEnqueue(actionEnqueue.port().value(),
589 actionEnqueue.queueId());
590 openFlowActions.add(ofa);
591 actionsLen += ofa.getLength();
592 }
593 }
594
595 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
596 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
597 .setPriority(PRIORITY_DEFAULT)
598 .setBufferId(OFPacketOut.BUFFER_ID_NONE)
599 .setCookie(cookie)
600 .setCommand(flowModCommand)
601 .setMatch(match)
602 .setActions(openFlowActions)
603 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
604 fm.setOutPort(OFPort.OFPP_NONE.getValue());
605 if ((flowModCommand == OFFlowMod.OFPFC_DELETE) ||
606 (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
607 if (actionOutputPort != null)
608 fm.setOutPort(actionOutputPort);
609 }
610
611 //
612 // TODO: Set the following flag
613 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
614 // See method ForwardingBase::pushRoute()
615 //
616
617 //
618 // Write the message to the switch
619 //
620 log.debug("MEASUREMENT: Installing flow entry " + userState +
621 " into switch DPID: " +
622 sw.getStringId() +
623 " flowEntryId: " + flowEntryId.toString() +
624 " srcMac: " + matchSrcMac + " dstMac: " + matchDstMac +
625 " inPort: " + matchInPort + " outPort: " + actionOutputPort
626 );
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800627 add(sw,fm);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800628 //
629 // TODO: We should use the OpenFlow Barrier mechanism
630 // to check for errors, and update the SwitchState
631 // for a flow entry after the Barrier message is
632 // is received.
633 //
634 flowEntryObj.setSwitchState("FE_SWITCH_UPDATED");
635
636 return true;
637 }
638
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800639 public boolean add(IOFSwitch sw, FlowPath flowPath, FlowEntry flowEntry) {
640 //
641 // Create the OpenFlow Flow Modification Entry to push
642 //
643 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
644 long cookie = flowEntry.flowEntryId().value();
645
646 short flowModCommand = OFFlowMod.OFPFC_ADD;
647 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
648 flowModCommand = OFFlowMod.OFPFC_ADD;
649 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
650 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
651 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
652 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
653 } else {
654 // Unknown user state. Ignore the entry
655 log.debug(
656 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
657 flowEntry.flowEntryId().toString(),
658 flowEntry.flowEntryUserState());
659 return false;
660 }
661
662 //
663 // Fetch the match conditions.
664 //
665 // NOTE: The Flow matching conditions common for all Flow Entries are
666 // used ONLY if a Flow Entry does NOT have the corresponding matching
667 // condition set.
668 //
669 OFMatch match = new OFMatch();
670 match.setWildcards(OFMatch.OFPFW_ALL);
671 FlowEntryMatch flowPathMatch = flowPath.flowEntryMatch();
672 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
673
674 // Match the Incoming Port
675 Port matchInPort = flowEntryMatch.inPort();
676 if (matchInPort != null) {
677 match.setInputPort(matchInPort.value());
678 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
679 }
680
681 // Match the Source MAC address
682 MACAddress matchSrcMac = flowEntryMatch.srcMac();
683 if ((matchSrcMac == null) && (flowPathMatch != null)) {
684 matchSrcMac = flowPathMatch.srcMac();
685 }
686 if (matchSrcMac != null) {
687 match.setDataLayerSource(matchSrcMac.toString());
688 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
689 }
690
691 // Match the Destination MAC address
692 MACAddress matchDstMac = flowEntryMatch.dstMac();
693 if ((matchDstMac == null) && (flowPathMatch != null)) {
694 matchDstMac = flowPathMatch.dstMac();
695 }
696 if (matchDstMac != null) {
697 match.setDataLayerDestination(matchDstMac.toString());
698 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
699 }
700
701 // Match the Ethernet Frame Type
702 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
703 if ((matchEthernetFrameType == null) && (flowPathMatch != null)) {
704 matchEthernetFrameType = flowPathMatch.ethernetFrameType();
705 }
706 if (matchEthernetFrameType != null) {
707 match.setDataLayerType(matchEthernetFrameType);
708 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
709 }
710
711 // Match the VLAN ID
712 Short matchVlanId = flowEntryMatch.vlanId();
713 if ((matchVlanId == null) && (flowPathMatch != null)) {
714 matchVlanId = flowPathMatch.vlanId();
715 }
716 if (matchVlanId != null) {
717 match.setDataLayerVirtualLan(matchVlanId);
718 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
719 }
720
721 // Match the VLAN priority
722 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
723 if ((matchVlanPriority == null) && (flowPathMatch != null)) {
724 matchVlanPriority = flowPathMatch.vlanPriority();
725 }
726 if (matchVlanPriority != null) {
727 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
728 match.setWildcards(match.getWildcards()
729 & ~OFMatch.OFPFW_DL_VLAN_PCP);
730 }
731
732 // Match the Source IPv4 Network prefix
733 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
734 if ((matchSrcIPv4Net == null) && (flowPathMatch != null)) {
735 matchSrcIPv4Net = flowPathMatch.srcIPv4Net();
736 }
737 if (matchSrcIPv4Net != null) {
738 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
739 }
740
741 // Natch the Destination IPv4 Network prefix
742 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
743 if ((matchDstIPv4Net == null) && (flowPathMatch != null)) {
744 matchDstIPv4Net = flowPathMatch.dstIPv4Net();
745 }
746 if (matchDstIPv4Net != null) {
747 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
748 }
749
750 // Match the IP protocol
751 Byte matchIpProto = flowEntryMatch.ipProto();
752 if ((matchIpProto == null) && (flowPathMatch != null)) {
753 matchIpProto = flowPathMatch.ipProto();
754 }
755 if (matchIpProto != null) {
756 match.setNetworkProtocol(matchIpProto);
757 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
758 }
759
760 // Match the IP ToS (DSCP field, 6 bits)
761 Byte matchIpToS = flowEntryMatch.ipToS();
762 if ((matchIpToS == null) && (flowPathMatch != null)) {
763 matchIpToS = flowPathMatch.ipToS();
764 }
765 if (matchIpToS != null) {
766 match.setNetworkTypeOfService(matchIpToS);
767 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
768 }
769
770 // Match the Source TCP/UDP port
771 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
772 if ((matchSrcTcpUdpPort == null) && (flowPathMatch != null)) {
773 matchSrcTcpUdpPort = flowPathMatch.srcTcpUdpPort();
774 }
775 if (matchSrcTcpUdpPort != null) {
776 match.setTransportSource(matchSrcTcpUdpPort);
777 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
778 }
779
780 // Match the Destination TCP/UDP port
781 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
782 if ((matchDstTcpUdpPort == null) && (flowPathMatch != null)) {
783 matchDstTcpUdpPort = flowPathMatch.dstTcpUdpPort();
784 }
785 if (matchDstTcpUdpPort != null) {
786 match.setTransportDestination(matchDstTcpUdpPort);
787 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
788 }
789
790 //
791 // Fetch the actions
792 //
793 Short actionOutputPort = null;
794 List<OFAction> openFlowActions = new ArrayList<OFAction>();
795 int actionsLen = 0;
796 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
797 //
798 for (FlowEntryAction action : flowEntryActions.actions()) {
799 ActionOutput actionOutput = action.actionOutput();
800 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
801 ActionSetVlanPriority actionSetVlanPriority = action
802 .actionSetVlanPriority();
803 ActionStripVlan actionStripVlan = action.actionStripVlan();
804 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
805 .actionSetEthernetSrcAddr();
806 ActionSetEthernetAddr actionSetEthernetDstAddr = action
807 .actionSetEthernetDstAddr();
808 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
809 .actionSetIPv4SrcAddr();
810 ActionSetIPv4Addr actionSetIPv4DstAddr = action
811 .actionSetIPv4DstAddr();
812 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
813 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
814 .actionSetTcpUdpSrcPort();
815 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
816 .actionSetTcpUdpDstPort();
817 ActionEnqueue actionEnqueue = action.actionEnqueue();
818
819 if (actionOutput != null) {
820 actionOutputPort = actionOutput.port().value();
821 // XXX: The max length is hard-coded for now
822 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
823 .value(), (short) 0xffff);
824 openFlowActions.add(ofa);
825 actionsLen += ofa.getLength();
826 }
827
828 if (actionSetVlanId != null) {
829 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
830 actionSetVlanId.vlanId());
831 openFlowActions.add(ofa);
832 actionsLen += ofa.getLength();
833 }
834
835 if (actionSetVlanPriority != null) {
836 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
837 actionSetVlanPriority.vlanPriority());
838 openFlowActions.add(ofa);
839 actionsLen += ofa.getLength();
840 }
841
842 if (actionStripVlan != null) {
843 if (actionStripVlan.stripVlan() == true) {
844 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
845 openFlowActions.add(ofa);
846 actionsLen += ofa.getLength();
847 }
848 }
849
850 if (actionSetEthernetSrcAddr != null) {
851 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
852 actionSetEthernetSrcAddr.addr().toBytes());
853 openFlowActions.add(ofa);
854 actionsLen += ofa.getLength();
855 }
856
857 if (actionSetEthernetDstAddr != null) {
858 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
859 actionSetEthernetDstAddr.addr().toBytes());
860 openFlowActions.add(ofa);
861 actionsLen += ofa.getLength();
862 }
863
864 if (actionSetIPv4SrcAddr != null) {
865 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
866 actionSetIPv4SrcAddr.addr().value());
867 openFlowActions.add(ofa);
868 actionsLen += ofa.getLength();
869 }
870
871 if (actionSetIPv4DstAddr != null) {
872 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
873 actionSetIPv4DstAddr.addr().value());
874 openFlowActions.add(ofa);
875 actionsLen += ofa.getLength();
876 }
877
878 if (actionSetIpToS != null) {
879 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
880 actionSetIpToS.ipToS());
881 openFlowActions.add(ofa);
882 actionsLen += ofa.getLength();
883 }
884
885 if (actionSetTcpUdpSrcPort != null) {
886 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
887 actionSetTcpUdpSrcPort.port());
888 openFlowActions.add(ofa);
889 actionsLen += ofa.getLength();
890 }
891
892 if (actionSetTcpUdpDstPort != null) {
893 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
894 actionSetTcpUdpDstPort.port());
895 openFlowActions.add(ofa);
896 actionsLen += ofa.getLength();
897 }
898
899 if (actionEnqueue != null) {
900 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
901 .value(), actionEnqueue.queueId());
902 openFlowActions.add(ofa);
903 actionsLen += ofa.getLength();
904 }
905 }
906
907 fm.setIdleTimeout(FLOWMOD_DEFAULT_IDLE_TIMEOUT)
908 .setHardTimeout(FLOWMOD_DEFAULT_HARD_TIMEOUT)
909 .setPriority(PRIORITY_DEFAULT)
910 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
911 .setCommand(flowModCommand).setMatch(match)
912 .setActions(openFlowActions)
913 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
914 fm.setOutPort(OFPort.OFPP_NONE.getValue());
915 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
916 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
917 if (actionOutputPort != null)
918 fm.setOutPort(actionOutputPort);
919 }
920
921 //
922 // TODO: Set the following flag
923 // fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
924 // See method ForwardingBase::pushRoute()
925 //
926
927 //
928 // Write the message to the switch
929 //
930 log.debug("MEASUREMENT: Installing flow entry "
931 + flowEntry.flowEntryUserState() + " into switch DPID: "
932 + sw.getStringId() + " flowEntryId: "
933 + flowEntry.flowEntryId().toString() + " srcMac: "
934 + matchSrcMac + " dstMac: " + matchDstMac + " inPort: "
935 + matchInPort + " outPort: " + actionOutputPort);
936
937 //
938 // TODO: We should use the OpenFlow Barrier mechanism
939 // to check for errors, and update the SwitchState
940 // for a flow entry after the Barrier message is
941 // is received.
942 //
943 // TODO: The FlowEntry Object in Titan should be set
944 // to FE_SWITCH_UPDATED.
945 //
946
947 return add(sw,fm);
948 }
949
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800950 private SwitchQueue getQueue(IOFSwitch sw) {
951 if (sw == null) {
952 return null;
953 }
954
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800955 return getProcess(sw).queues.get(sw);
956 }
957
958 private long getHash(IOFSwitch sw) {
959 // TODO should consider equalization algorithm
960 return sw.getId() % number_thread;
961 }
962
963 private FlowPusherProcess getProcess(IOFSwitch sw) {
964 long hash = getHash(sw);
965
966 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800967 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700968}