blob: 1c0802a3fc0cc4ae0643b5114a9039db7a9513b4 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080014import java.util.Set;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070015import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080016import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070017import java.util.concurrent.locks.Condition;
18import java.util.concurrent.locks.Lock;
19import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
Naoki Shiota7d0cf272013-11-05 10:18:12 -080021import org.openflow.protocol.*;
22import org.openflow.protocol.action.*;
23import org.openflow.protocol.factory.BasicFactory;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070026
27import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080028import net.floodlightcontroller.core.IFloodlightProviderService;
29import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070030import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080031import net.floodlightcontroller.core.internal.OFMessageFuture;
32import net.floodlightcontroller.core.module.FloodlightModuleContext;
33import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080034import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080035import net.floodlightcontroller.util.OFMessageDamper;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080036import net.onrc.onos.ofcontroller.util.FlowEntryAction;
37import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080038import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080039import net.onrc.onos.ofcontroller.util.FlowEntryActions;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080040import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
41import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080042import net.onrc.onos.ofcontroller.util.IPv4Net;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080043import net.onrc.onos.ofcontroller.util.Pair;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080044import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070045
46/**
Naoki Shiotab485d412013-11-26 12:04:19 -080047 * FlowPusher is a implementation of FlowPusherService.
48 * FlowPusher assigns one message queue instance for each one switch.
49 * Number of message processing threads is configurable by constructor, and
Ray Milkey8e5170e2014-04-02 12:09:55 -070050 * one thread can handle multiple message queues. Each queue will be assigned to
Naoki Shiotab485d412013-11-26 12:04:19 -080051 * a thread according to hash function defined by getHash().
52 * Each processing thread reads messages from queues and sends it to switches
53 * in round-robin. Processing thread also calculates rate of sending to suppress
54 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070055 *
Ray Milkey8e5170e2014-04-02 12:09:55 -070056 * @author Naoki Shiota
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070057 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080058public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080059 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070060 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080061
Naoki Shiota7d0cf272013-11-05 10:18:12 -080062 // TODO: Values copied from elsewhere (class LearningSwitch).
63 // The local copy should go away!
64 //
65 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
Ray Milkey8e5170e2014-04-02 12:09:55 -070066 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
67
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080068 // Number of messages sent to switch at once
69 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080070
Ray Milkey8e5170e2014-04-02 12:09:55 -070071 private static class SwitchQueueEntry {
72 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070073
Ray Milkey8e5170e2014-04-02 12:09:55 -070074 public SwitchQueueEntry(OFMessage msg) {
75 this.msg = msg;
76 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070077
Ray Milkey8e5170e2014-04-02 12:09:55 -070078 public OFMessage getOFMessage() {
79 return msg;
80 }
81 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -070082
Ray Milkey8e5170e2014-04-02 12:09:55 -070083 /**
84 * SwitchQueue represents message queue attached to a switch.
85 * This consists of queue itself and variables used for limiting sending rate.
86 *
87 * @author Naoki Shiota
88 */
89 private class SwitchQueue {
90 List<Queue<SwitchQueueEntry>> raw_queues;
91 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080092
Ray Milkey8e5170e2014-04-02 12:09:55 -070093 // Max rate of sending message (bytes/ms). 0 implies no limitation.
94 long max_rate = 0; // 0 indicates no limitation
95 long last_sent_time = 0;
96 long last_sent_size = 0;
Naoki Shiota05334692014-03-18 16:06:36 -070097
Ray Milkey8e5170e2014-04-02 12:09:55 -070098 // "To be deleted" flag
99 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800100
Ray Milkey8e5170e2014-04-02 12:09:55 -0700101 SwitchQueue() {
102 raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
103 MsgPriority.values().length);
104 for (int i = 0; i < MsgPriority.values().length; ++i) {
105 raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
106 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800107
Ray Milkey8e5170e2014-04-02 12:09:55 -0700108 state = QueueState.READY;
109 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800110
Ray Milkey8e5170e2014-04-02 12:09:55 -0700111 /**
112 * Check if sending rate is within the rate
113 *
114 * @param current Current time
115 * @return true if within the rate
116 */
117 boolean isSendable(long current) {
118 if (max_rate == 0) {
119 // no limitation
120 return true;
121 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800122
Ray Milkey8e5170e2014-04-02 12:09:55 -0700123 if (current == last_sent_time) {
124 return false;
125 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800126
Ray Milkey8e5170e2014-04-02 12:09:55 -0700127 // Check if sufficient time (from aspect of rate) elapsed or not.
128 long rate = last_sent_size / (current - last_sent_time);
129 return (rate < max_rate);
130 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800131
Ray Milkey8e5170e2014-04-02 12:09:55 -0700132 /**
133 * Log time and size of last sent data.
134 *
135 * @param current Time to be sent.
136 * @param size Size of sent data (in bytes).
137 */
138 void logSentData(long current, long size) {
139 last_sent_time = current;
140 last_sent_size = size;
141 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700142
Ray Milkey8e5170e2014-04-02 12:09:55 -0700143 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
144 Queue<SwitchQueueEntry> queue = getQueue(priority);
145 if (queue == null) {
146 log.error("Unexpected priority : ", priority);
147 return false;
148 }
149 return queue.add(entry);
150 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800151
Ray Milkey8e5170e2014-04-02 12:09:55 -0700152 /**
153 * Poll single appropriate entry object according to QueueState.
154 *
155 * @return Entry object.
156 */
157 SwitchQueueEntry poll() {
158 switch (state) {
159 case READY: {
160 for (int i = 0; i < raw_queues.size(); ++i) {
161 SwitchQueueEntry entry = raw_queues.get(i).poll();
162 if (entry != null) {
163 return entry;
164 }
165 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800166
Ray Milkey8e5170e2014-04-02 12:09:55 -0700167 return null;
168 }
169 case SUSPENDED: {
170 // Only polling from high priority queue
171 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
172 return entry;
173 }
174 default:
175 log.error("Unexpected QueueState : ", state);
176 return null;
177 }
178 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800179
Ray Milkey8e5170e2014-04-02 12:09:55 -0700180 /**
181 * Check if this object has any messages in the queues to be sent
182 *
183 * @return True if there are some messages to be sent.
184 */
185 boolean hasMessageToSend() {
186 switch (state) {
187 case READY:
188 for (Queue<SwitchQueueEntry> queue : raw_queues) {
189 if (!queue.isEmpty()) {
190 return true;
191 }
192 }
193 break;
194 case SUSPENDED:
195 // Only checking high priority queue
196 return (!getQueue(MsgPriority.HIGH).isEmpty());
197 default:
198 log.error("Unexpected QueueState : ", state);
199 return false;
200 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800201
Ray Milkey8e5170e2014-04-02 12:09:55 -0700202 return false;
203 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800204
Ray Milkey8e5170e2014-04-02 12:09:55 -0700205 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
206 return raw_queues.get(priority.ordinal());
207 }
208 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800209
Ray Milkey8e5170e2014-04-02 12:09:55 -0700210 /**
211 * BarrierInfo holds information to specify barrier message sent to switch.
212 *
213 * @author Naoki
214 */
215 private static class BarrierInfo {
216 final long dpid;
217 final int xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800218
Ray Milkey8e5170e2014-04-02 12:09:55 -0700219 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
220 return new BarrierInfo(sw.getId(), req.getXid());
221 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800222
Ray Milkey8e5170e2014-04-02 12:09:55 -0700223 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
224 return new BarrierInfo(sw.getId(), rpy.getXid());
225 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800226
Ray Milkey8e5170e2014-04-02 12:09:55 -0700227 private BarrierInfo(long dpid, int xid) {
228 this.dpid = dpid;
229 this.xid = xid;
230 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800231
Ray Milkey8e5170e2014-04-02 12:09:55 -0700232 // Auto generated code by Eclipse
233 @Override
234 public int hashCode() {
235 final int prime = 31;
236 int result = 1;
237 result = prime * result + (int) (dpid ^ (dpid >>> 32));
238 result = prime * result + xid;
239 return result;
240 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800241
Ray Milkey8e5170e2014-04-02 12:09:55 -0700242 @Override
243 public boolean equals(Object obj) {
244 if (this == obj)
245 return true;
246 if (obj == null)
247 return false;
248 if (getClass() != obj.getClass())
249 return false;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800250
Ray Milkey8e5170e2014-04-02 12:09:55 -0700251 BarrierInfo other = (BarrierInfo) obj;
252 return (this.dpid == other.dpid) && (this.xid == other.xid);
253 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800254
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800255
Ray Milkey8e5170e2014-04-02 12:09:55 -0700256 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800257
Ray Milkey8e5170e2014-04-02 12:09:55 -0700258 private OFMessageDamper messageDamper = null;
259 private IThreadPoolService threadPool = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800260
Ray Milkey8e5170e2014-04-02 12:09:55 -0700261 private FloodlightContext context = null;
262 private BasicFactory factory = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800263
Ray Milkey8e5170e2014-04-02 12:09:55 -0700264 // Map of threads versus dpid
265 private Map<Long, FlowPusherThread> threadMap = null;
266 // Map from (DPID and transaction ID) to Future objects.
267 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
268 = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800269
Ray Milkey8e5170e2014-04-02 12:09:55 -0700270 private int number_thread;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800271
Ray Milkey8e5170e2014-04-02 12:09:55 -0700272 /**
273 * Main thread that reads messages from queues and sends them to switches.
274 *
275 * @author Naoki Shiota
276 */
277 private class FlowPusherThread extends Thread {
278 private Map<IOFSwitch, SwitchQueue> assignedQueues
279 = new ConcurrentHashMap<IOFSwitch, SwitchQueue>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800280
Ray Milkey8e5170e2014-04-02 12:09:55 -0700281 final Lock queuingLock = new ReentrantLock();
282 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800283
Ray Milkey8e5170e2014-04-02 12:09:55 -0700284 @Override
285 public void run() {
286 this.setName("FlowPusherThread " + this.getId());
287 while (true) {
288 while (!queuesHasMessageToSend()) {
289 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800290
Ray Milkey8e5170e2014-04-02 12:09:55 -0700291 try {
292 // wait for message pushed to queue
293 messagePushed.await();
294 } catch (InterruptedException e) {
295 // Interrupted to be shut down (not an error)
296 log.debug("FlowPusherThread is interrupted");
297 return;
298 } finally {
299 queuingLock.unlock();
300 }
301 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800302
Ray Milkey8e5170e2014-04-02 12:09:55 -0700303 // for safety of concurrent access, copy set of key objects
304 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
305 for (IOFSwitch sw : assignedQueues.keySet()) {
306 keys.add(sw);
307 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800308
Ray Milkey8e5170e2014-04-02 12:09:55 -0700309 for (IOFSwitch sw : keys) {
310 SwitchQueue queue = assignedQueues.get(sw);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800311
Ray Milkey8e5170e2014-04-02 12:09:55 -0700312 if (sw == null || queue == null) {
313 continue;
314 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800315
Ray Milkey8e5170e2014-04-02 12:09:55 -0700316 synchronized (queue) {
317 processQueue(sw, queue, MAX_MESSAGE_SEND);
318 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
319 // remove queue if flagged to be.
320 assignedQueues.remove(sw);
321 }
322 }
323 }
324 }
325 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800326
Ray Milkey8e5170e2014-04-02 12:09:55 -0700327 /**
328 * Read messages from queue and send them to the switch.
329 * If number of messages excess the limit, stop sending messages.
330 *
331 * @param sw Switch to which messages will be sent.
332 * @param queue Queue of messages.
333 * @param max_msg Limitation of number of messages to be sent. If set to 0,
334 * all messages in queue will be sent.
335 */
336 private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
337 // check sending rate and determine it to be sent or not
338 long current_time = System.currentTimeMillis();
339 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800340
Ray Milkey8e5170e2014-04-02 12:09:55 -0700341 if (queue.isSendable(current_time)) {
342 int i = 0;
343 while (queue.hasMessageToSend()) {
344 // Number of messages excess the limit
345 if (0 < max_msg && max_msg <= i) {
346 break;
347 }
348 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800349
Ray Milkey8e5170e2014-04-02 12:09:55 -0700350 SwitchQueueEntry queueEntry;
351 synchronized (queue) {
352 queueEntry = queue.poll();
353 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800354
Ray Milkey8e5170e2014-04-02 12:09:55 -0700355 OFMessage msg = queueEntry.getOFMessage();
356 try {
357 messageDamper.write(sw, msg, context);
358 if (log.isTraceEnabled()) {
359 log.trace("Pusher sends message : {}", msg);
360 }
361 size += msg.getLength();
362 } catch (IOException e) {
363 e.printStackTrace();
364 log.error("Exception in sending message ({}) : {}", msg, e);
365 }
366 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800367
Ray Milkey8e5170e2014-04-02 12:09:55 -0700368 sw.flush();
369 queue.logSentData(current_time, size);
370 }
371 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800372
Ray Milkey8e5170e2014-04-02 12:09:55 -0700373 private boolean queuesHasMessageToSend() {
374 for (SwitchQueue queue : assignedQueues.values()) {
375 if (queue.hasMessageToSend()) {
376 return true;
377 }
378 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700379
Ray Milkey8e5170e2014-04-02 12:09:55 -0700380 return false;
381 }
Naoki Shiota05334692014-03-18 16:06:36 -0700382
Ray Milkey8e5170e2014-04-02 12:09:55 -0700383 private void notifyMessagePushed() {
384 queuingLock.lock();
385 try {
386 messagePushed.signal();
387 } finally {
388 queuingLock.unlock();
389 }
390 }
391 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700392
Ray Milkey8e5170e2014-04-02 12:09:55 -0700393 /**
394 * Initialize object with one thread.
395 */
396 public FlowPusher() {
397 number_thread = DEFAULT_NUMBER_THREAD;
398 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800399
Ray Milkey8e5170e2014-04-02 12:09:55 -0700400 /**
401 * Initialize object with threads of given number.
402 *
403 * @param number_thread Number of threads to handle messages.
404 */
405 public FlowPusher(int number_thread) {
406 if (number_thread > 0) {
407 this.number_thread = number_thread;
408 } else {
409 this.number_thread = DEFAULT_NUMBER_THREAD;
410 }
411 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800412
Ray Milkey8e5170e2014-04-02 12:09:55 -0700413 /**
414 * Set parameters needed for sending messages.
415 *
416 * @param context FloodlightContext used for sending messages.
417 * If null, FlowPusher uses default context.
418 * @param modContext FloodlightModuleContext used for acquiring
419 * ThreadPoolService and registering MessageListener.
420 * @param factory Factory object to create OFMessage objects.
421 * @param damper Message damper used for sending messages.
422 * If null, FlowPusher creates its own damper object.
423 */
424 public void init(FloodlightContext context,
425 FloodlightModuleContext modContext,
426 BasicFactory factory,
427 OFMessageDamper damper) {
428 this.context = context;
429 this.factory = factory;
430 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
431 IFloodlightProviderService flservice
432 = modContext.getServiceImpl(IFloodlightProviderService.class);
433 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800434
Ray Milkey8e5170e2014-04-02 12:09:55 -0700435 if (damper != null) {
436 messageDamper = damper;
437 } else {
438 // use default values
439 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
440 EnumSet.of(OFType.FLOW_MOD),
441 OFMESSAGE_DAMPER_TIMEOUT);
442 }
443 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800444
Ray Milkey8e5170e2014-04-02 12:09:55 -0700445 /**
446 * Begin processing queue.
447 */
448 public void start() {
449 if (factory == null) {
450 log.error("FlowPusher not yet initialized.");
451 return;
452 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800453
Ray Milkey8e5170e2014-04-02 12:09:55 -0700454 threadMap = new HashMap<Long, FlowPusherThread>();
455 for (long i = 0; i < number_thread; ++i) {
456 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800457
Ray Milkey8e5170e2014-04-02 12:09:55 -0700458 threadMap.put(i, thread);
459 thread.start();
460 }
461 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800462
Ray Milkey8e5170e2014-04-02 12:09:55 -0700463 @Override
464 public boolean suspend(IOFSwitch sw) {
465 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700466
Ray Milkey8e5170e2014-04-02 12:09:55 -0700467 if (queue == null) {
468 // create queue in case suspend is called before first message addition
469 queue = createQueueImpl(sw);
470 }
471
472 synchronized (queue) {
473 if (queue.state == QueueState.READY) {
474 queue.state = QueueState.SUSPENDED;
475 return true;
476 }
477 return false;
478 }
479 }
480
481 @Override
482 public boolean resume(IOFSwitch sw) {
483 SwitchQueue queue = getQueue(sw);
484
485 if (queue == null) {
486 log.error("No queue is attached to DPID : {}", sw.getId());
487 return false;
488 }
489
490 synchronized (queue) {
491 if (queue.state == QueueState.SUSPENDED) {
492 queue.state = QueueState.READY;
493
494 // Free the latch if queue has any messages
495 FlowPusherThread thread = getProcessingThread(sw);
496 if (queue.hasMessageToSend()) {
497 thread.notifyMessagePushed();
498 }
499 return true;
500 }
501 return false;
502 }
503 }
504
505 @Override
506 public QueueState getState(IOFSwitch sw) {
507 SwitchQueue queue = getQueue(sw);
508
509 if (queue == null) {
510 return QueueState.UNKNOWN;
511 }
512
513 return queue.state;
514 }
515
516 /**
517 * Stop processing queue and exit thread.
518 */
519 public void stop() {
520 if (threadMap == null) {
521 return;
522 }
523
524 for (FlowPusherThread t : threadMap.values()) {
525 t.interrupt();
526 }
527 }
528
529 @Override
530 public void setRate(IOFSwitch sw, long rate) {
531 SwitchQueue queue = getQueue(sw);
532 if (queue == null) {
533 queue = createQueueImpl(sw);
534 }
535
536 if (rate > 0) {
537 log.debug("rate for {} is set to {}", sw.getId(), rate);
538 synchronized (queue) {
539 queue.max_rate = rate;
540 }
541 }
542 }
543
544 @Override
545 public boolean createQueue(IOFSwitch sw) {
546 SwitchQueue queue = createQueueImpl(sw);
547
548 return (queue != null);
549 }
550
551 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
552 SwitchQueue queue = getQueue(sw);
553 if (queue != null) {
554 return queue;
555 }
556
557 FlowPusherThread proc = getProcessingThread(sw);
558 queue = new SwitchQueue();
559 queue.state = QueueState.READY;
560 proc.assignedQueues.put(sw, queue);
561
562 return queue;
563 }
564
565 @Override
566 public boolean deleteQueue(IOFSwitch sw) {
567 return deleteQueue(sw, false);
568 }
569
570 @Override
571 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
572 FlowPusherThread proc = getProcessingThread(sw);
573
574 if (forceStop) {
575 SwitchQueue queue = proc.assignedQueues.remove(sw);
576 if (queue == null) {
577 return false;
578 }
579 return true;
580 } else {
581 SwitchQueue queue = getQueue(sw);
582 if (queue == null) {
583 return false;
584 }
585 synchronized (queue) {
586 queue.toBeDeleted = true;
587 }
588 return true;
589 }
590 }
591
592 @Override
593 public boolean add(IOFSwitch sw, OFMessage msg) {
594 return add(sw, msg, MsgPriority.NORMAL);
595 }
596
597 @Override
598 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
599 return addMessageImpl(sw, msg, priority);
600 }
601
602 @Override
603 public void pushFlowEntries(
604 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
605 pushFlowEntries(entries, MsgPriority.NORMAL);
606 }
607
608 @Override
609 public void pushFlowEntries(
610 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
611
612 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
613 add(entry.first, entry.second, priority);
614 }
615 }
616
617 @Override
618 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
619 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
620 }
621
622 @Override
623 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
624 Collection<Pair<IOFSwitch, FlowEntry>> entries =
625 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
626
627 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
628 pushFlowEntries(entries, priority);
629 }
630
631 /**
632 * Create a message from FlowEntry and add it to the queue of the switch.
633 *
634 * @param sw Switch to which message is pushed.
635 * @param flowEntry FlowEntry object used for creating message.
636 * @return true if message is successfully added to a queue.
637 */
638 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
639 //
640 // Create the OpenFlow Flow Modification Entry to push
641 //
642 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
643 long cookie = flowEntry.flowEntryId().value();
644
645 short flowModCommand = OFFlowMod.OFPFC_ADD;
646 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
647 flowModCommand = OFFlowMod.OFPFC_ADD;
648 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
649 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
650 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
651 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
652 } else {
653 // Unknown user state. Ignore the entry
654 log.debug(
655 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
656 flowEntry.flowEntryId(),
657 flowEntry.flowEntryUserState());
658 return false;
659 }
660
661 //
662 // Fetch the match conditions.
663 //
664 // NOTE: The Flow matching conditions common for all Flow Entries are
665 // used ONLY if a Flow Entry does NOT have the corresponding matching
666 // condition set.
667 //
668 OFMatch match = new OFMatch();
669 match.setWildcards(OFMatch.OFPFW_ALL);
670 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
671
672 // Match the Incoming Port
673 Port matchInPort = flowEntryMatch.inPort();
674 if (matchInPort != null) {
675 match.setInputPort(matchInPort.value());
676 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
677 }
678
679 // Match the Source MAC address
680 MACAddress matchSrcMac = flowEntryMatch.srcMac();
681 if (matchSrcMac != null) {
682 match.setDataLayerSource(matchSrcMac.toString());
683 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
684 }
685
686 // Match the Destination MAC address
687 MACAddress matchDstMac = flowEntryMatch.dstMac();
688 if (matchDstMac != null) {
689 match.setDataLayerDestination(matchDstMac.toString());
690 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
691 }
692
693 // Match the Ethernet Frame Type
694 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
695 if (matchEthernetFrameType != null) {
696 match.setDataLayerType(matchEthernetFrameType);
697 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
698 }
699
700 // Match the VLAN ID
701 Short matchVlanId = flowEntryMatch.vlanId();
702 if (matchVlanId != null) {
703 match.setDataLayerVirtualLan(matchVlanId);
704 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
705 }
706
707 // Match the VLAN priority
708 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
709 if (matchVlanPriority != null) {
710 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
711 match.setWildcards(match.getWildcards()
712 & ~OFMatch.OFPFW_DL_VLAN_PCP);
713 }
714
715 // Match the Source IPv4 Network prefix
716 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
717 if (matchSrcIPv4Net != null) {
718 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
719 }
720
721 // Natch the Destination IPv4 Network prefix
722 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
723 if (matchDstIPv4Net != null) {
724 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
725 }
726
727 // Match the IP protocol
728 Byte matchIpProto = flowEntryMatch.ipProto();
729 if (matchIpProto != null) {
730 match.setNetworkProtocol(matchIpProto);
731 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
732 }
733
734 // Match the IP ToS (DSCP field, 6 bits)
735 Byte matchIpToS = flowEntryMatch.ipToS();
736 if (matchIpToS != null) {
737 match.setNetworkTypeOfService(matchIpToS);
738 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
739 }
740
741 // Match the Source TCP/UDP port
742 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
743 if (matchSrcTcpUdpPort != null) {
744 match.setTransportSource(matchSrcTcpUdpPort);
745 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
746 }
747
748 // Match the Destination TCP/UDP port
749 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
750 if (matchDstTcpUdpPort != null) {
751 match.setTransportDestination(matchDstTcpUdpPort);
752 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
753 }
754
755 //
756 // Fetch the actions
757 //
758 Short actionOutputPort = null;
759 List<OFAction> openFlowActions = new ArrayList<OFAction>();
760 int actionsLen = 0;
761 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
762 //
763 for (FlowEntryAction action : flowEntryActions.actions()) {
764 ActionOutput actionOutput = action.actionOutput();
765 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
766 ActionSetVlanPriority actionSetVlanPriority = action
767 .actionSetVlanPriority();
768 ActionStripVlan actionStripVlan = action.actionStripVlan();
769 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
770 .actionSetEthernetSrcAddr();
771 ActionSetEthernetAddr actionSetEthernetDstAddr = action
772 .actionSetEthernetDstAddr();
773 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
774 .actionSetIPv4SrcAddr();
775 ActionSetIPv4Addr actionSetIPv4DstAddr = action
776 .actionSetIPv4DstAddr();
777 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
778 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
779 .actionSetTcpUdpSrcPort();
780 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
781 .actionSetTcpUdpDstPort();
782 ActionEnqueue actionEnqueue = action.actionEnqueue();
783
784 if (actionOutput != null) {
785 actionOutputPort = actionOutput.port().value();
786 // XXX: The max length is hard-coded for now
787 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
788 .value(), (short) 0xffff);
789 openFlowActions.add(ofa);
790 actionsLen += ofa.getLength();
791 }
792
793 if (actionSetVlanId != null) {
794 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
795 actionSetVlanId.vlanId());
796 openFlowActions.add(ofa);
797 actionsLen += ofa.getLength();
798 }
799
800 if (actionSetVlanPriority != null) {
801 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
802 actionSetVlanPriority.vlanPriority());
803 openFlowActions.add(ofa);
804 actionsLen += ofa.getLength();
805 }
806
807 if (actionStripVlan != null) {
808 if (actionStripVlan.stripVlan() == true) {
809 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
810 openFlowActions.add(ofa);
811 actionsLen += ofa.getLength();
812 }
813 }
814
815 if (actionSetEthernetSrcAddr != null) {
816 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
817 actionSetEthernetSrcAddr.addr().toBytes());
818 openFlowActions.add(ofa);
819 actionsLen += ofa.getLength();
820 }
821
822 if (actionSetEthernetDstAddr != null) {
823 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
824 actionSetEthernetDstAddr.addr().toBytes());
825 openFlowActions.add(ofa);
826 actionsLen += ofa.getLength();
827 }
828
829 if (actionSetIPv4SrcAddr != null) {
830 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
831 actionSetIPv4SrcAddr.addr().value());
832 openFlowActions.add(ofa);
833 actionsLen += ofa.getLength();
834 }
835
836 if (actionSetIPv4DstAddr != null) {
837 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
838 actionSetIPv4DstAddr.addr().value());
839 openFlowActions.add(ofa);
840 actionsLen += ofa.getLength();
841 }
842
843 if (actionSetIpToS != null) {
844 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
845 actionSetIpToS.ipToS());
846 openFlowActions.add(ofa);
847 actionsLen += ofa.getLength();
848 }
849
850 if (actionSetTcpUdpSrcPort != null) {
851 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
852 actionSetTcpUdpSrcPort.port());
853 openFlowActions.add(ofa);
854 actionsLen += ofa.getLength();
855 }
856
857 if (actionSetTcpUdpDstPort != null) {
858 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
859 actionSetTcpUdpDstPort.port());
860 openFlowActions.add(ofa);
861 actionsLen += ofa.getLength();
862 }
863
864 if (actionEnqueue != null) {
865 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
866 .value(), actionEnqueue.queueId());
867 openFlowActions.add(ofa);
868 actionsLen += ofa.getLength();
869 }
870 }
871
872 fm.setIdleTimeout((short) flowEntry.idleTimeout())
873 .setHardTimeout((short) flowEntry.hardTimeout())
874 .setPriority((short) flowEntry.priority())
875 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
876 .setCommand(flowModCommand).setMatch(match)
877 .setActions(openFlowActions)
878 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
879 fm.setOutPort(OFPort.OFPP_NONE.getValue());
880 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
881 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
882 if (actionOutputPort != null)
883 fm.setOutPort(actionOutputPort);
884 }
885
886 //
887 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
888 // permanent.
889 //
890 if ((flowEntry.idleTimeout() != 0) ||
891 (flowEntry.hardTimeout() != 0)) {
892 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
893 }
894
895 if (log.isTraceEnabled()) {
896 log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
897 , flowEntry.flowEntryUserState()
898 , sw.getStringId()
899 , flowEntry.flowEntryId()
900 , matchSrcMac
901 , matchDstMac
902 , matchInPort
903 , actionOutputPort
904 );
905 }
906
907 return addMessageImpl(sw, fm, priority);
908 }
909
910 /**
911 * Add message to queue
912 *
913 * @param sw
914 * @param msg
915 * @param flowEntryId
916 * @return
917 */
918 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
919 FlowPusherThread thread = getProcessingThread(sw);
920
921 SwitchQueue queue = getQueue(sw);
922
923 // create queue at first addition of message
924 if (queue == null) {
925 queue = createQueueImpl(sw);
926 }
927
928 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
929
930 synchronized (queue) {
931 queue.add(entry, priority);
932 if (log.isTraceEnabled()) {
933 log.trace("Message is pushed : {}", entry.getOFMessage());
934 }
935 }
936
937 thread.notifyMessagePushed();
938
939 return true;
940 }
941
942 @Override
943 public OFBarrierReply barrier(IOFSwitch sw) {
944 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
945 if (future == null) {
946 return null;
947 }
948
949 try {
950 return future.get();
951 } catch (InterruptedException e) {
952 e.printStackTrace();
953 log.error("InterruptedException: {}", e);
954 return null;
955 } catch (ExecutionException e) {
956 e.printStackTrace();
957 log.error("ExecutionException: {}", e);
958 return null;
959 }
960 }
961
962 @Override
963 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
964 // TODO creation of message and future should be moved to OFSwitchImpl
965
966 if (sw == null) {
967 return null;
968 }
969
970 OFBarrierRequest msg = createBarrierRequest(sw);
971
972 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
973 barrierFutures.put(BarrierInfo.create(sw, msg), future);
974
975 addMessageImpl(sw, msg, MsgPriority.NORMAL);
976
977 return future;
978 }
979
980 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
981 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
982 msg.setXid(sw.getNextTransactionId());
983
984 return msg;
985 }
986
987 /**
988 * Get a queue attached to a switch.
989 *
990 * @param sw Switch object
991 * @return Queue object
992 */
993 protected SwitchQueue getQueue(IOFSwitch sw) {
994 if (sw == null) {
995 return null;
996 }
997
998 FlowPusherThread th = getProcessingThread(sw);
999 if (th == null) {
1000 return null;
1001 }
1002
1003 return th.assignedQueues.get(sw);
1004 }
1005
1006 /**
1007 * Get a hash value correspondent to a switch.
1008 *
1009 * @param sw Switch object
1010 * @return Hash value
1011 */
1012 protected long getHash(IOFSwitch sw) {
1013 // This code assumes DPID is sequentially assigned.
1014 // TODO consider equalization algorithm
1015 return sw.getId() % number_thread;
1016 }
1017
1018 /**
1019 * Get a Thread object which processes the queue attached to a switch.
1020 *
1021 * @param sw Switch object
1022 * @return Thread object
1023 */
1024 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
1025 long hash = getHash(sw);
1026
1027 return threadMap.get(hash);
1028 }
1029
1030 @Override
1031 public String getName() {
1032 return "flowpusher";
1033 }
1034
1035 @Override
1036 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1037 return false;
1038 }
1039
1040 @Override
1041 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1042 return false;
1043 }
1044
1045 @Override
1046 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
1047 if (log.isTraceEnabled()) {
1048 log.trace("Received BARRIER_REPLY from : {}", sw.getId());
1049 }
1050
1051 if (msg.getType() != OFType.BARRIER_REPLY) {
1052 log.error("Unexpected reply message : {}", msg.getType());
1053 return Command.CONTINUE;
1054 }
1055
1056 OFBarrierReply reply = (OFBarrierReply) msg;
1057 BarrierInfo info = BarrierInfo.create(sw, reply);
1058
1059 // Deliver future if exists
1060 OFBarrierReplyFuture future = barrierFutures.get(info);
1061 if (future != null) {
1062 future.deliverFuture(sw, msg);
1063 barrierFutures.remove(info);
1064 }
1065
1066 return Command.CONTINUE;
1067 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001068}