blob: 3a0407f1b050c5564c394c8b543df92aaa3bea9f [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080014import java.util.Set;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070015import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080016import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070017import java.util.concurrent.locks.Condition;
18import java.util.concurrent.locks.Lock;
19import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
Naoki Shiota7d0cf272013-11-05 10:18:12 -080021import org.openflow.protocol.*;
22import org.openflow.protocol.action.*;
23import org.openflow.protocol.factory.BasicFactory;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070026
27import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080028import net.floodlightcontroller.core.IFloodlightProviderService;
29import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070030import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080031import net.floodlightcontroller.core.internal.OFMessageFuture;
32import net.floodlightcontroller.core.module.FloodlightModuleContext;
33import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080034import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080035import net.floodlightcontroller.util.OFMessageDamper;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080036import net.onrc.onos.ofcontroller.util.FlowEntryAction;
37import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080038import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080039import net.onrc.onos.ofcontroller.util.FlowEntryActions;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080040import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
41import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080042import net.onrc.onos.ofcontroller.util.IPv4Net;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080043import net.onrc.onos.ofcontroller.util.Pair;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080044import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070045
46/**
Naoki Shiotab485d412013-11-26 12:04:19 -080047 * FlowPusher is a implementation of FlowPusherService.
48 * FlowPusher assigns one message queue instance for each one switch.
49 * Number of message processing threads is configurable by constructor, and
50 * one thread can handle multiple message queues. Each queue will be assigned to
51 * a thread according to hash function defined by getHash().
52 * Each processing thread reads messages from queues and sends it to switches
53 * in round-robin. Processing thread also calculates rate of sending to suppress
54 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070055 * @author Naoki Shiota
56 *
57 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080058public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080059 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070060 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080061
62 // NOTE: Below are moved from FlowManager.
63 // TODO: Values copied from elsewhere (class LearningSwitch).
64 // The local copy should go away!
65 //
66 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
67 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080068
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080069 // Number of messages sent to switch at once
70 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080071
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070072 private static class SwitchQueueEntry {
73 OFMessage msg;
74
75 public SwitchQueueEntry(OFMessage msg) {
76 this.msg = msg;
77 }
78
79 public OFMessage getOFMessage() {
80 return msg;
81 }
82 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070083
Naoki Shiotac1601d32013-11-20 10:47:34 -080084 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080085 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080086 * This consists of queue itself and variables used for limiting sending rate.
87 * @author Naoki Shiota
88 *
89 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -070090 private class SwitchQueue {
91 List<Queue<SwitchQueueEntry>> raw_queues;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080092 QueueState state;
93
Naoki Shiotae3199732013-11-25 16:14:43 -080094 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080095 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070096 long last_sent_time = 0;
97 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080098
Naoki Shiotae3199732013-11-25 16:14:43 -080099 // "To be deleted" flag
100 boolean toBeDeleted = false;
101
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700102 SwitchQueue() {
103 raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
104 MsgPriority.values().length);
105 for (int i = 0; i < MsgPriority.values().length; ++i) {
106 raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
107 }
108
109 state = QueueState.READY;
110 }
111
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800112 /**
113 * Check if sending rate is within the rate
114 * @param current Current time
115 * @return true if within the rate
116 */
117 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800118 if (max_rate == 0) {
119 // no limitation
120 return true;
121 }
122
Naoki Shiota81dbe302013-11-21 15:35:38 -0800123 if (current == last_sent_time) {
124 return false;
125 }
126
Naoki Shiotac1601d32013-11-20 10:47:34 -0800127 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800128 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800129 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800130 }
131
Naoki Shiota81dbe302013-11-21 15:35:38 -0800132 /**
133 * Log time and size of last sent data.
134 * @param current Time to be sent.
135 * @param size Size of sent data (in bytes).
136 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800137 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800138 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800139 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800140 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700141
142 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
143 Queue<SwitchQueueEntry> queue = getQueue(priority);
144 if (queue == null) {
145 log.error("Unexpected priority : ", priority);
146 return false;
147 }
148 return queue.add(entry);
149 }
150
151 /**
152 * Poll single appropriate entry object according to QueueState.
153 * @return Entry object.
154 */
155 SwitchQueueEntry poll() {
156 switch (state) {
157 case READY:
158 {
159 for (int i = 0; i < raw_queues.size(); ++i) {
160 SwitchQueueEntry entry = raw_queues.get(i).poll();
161 if (entry != null) {
162 return entry;
163 }
164 }
165
166 return null;
167 }
168 case SUSPENDED:
169 {
170 // Only polling from high priority queue
171 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
172 return entry;
173 }
174 default:
175 log.error("Unexpected QueueState : ", state);
176 return null;
177 }
178 }
179
180 /**
181 * Check if this object has any messages in the queues to be sent
182 * @return True if there are some messages to be sent.
183 */
184 boolean hasMessageToSend() {
185 switch (state) {
186 case READY:
187 for (Queue<SwitchQueueEntry> queue : raw_queues) {
188 if (! queue.isEmpty()) {
189 return true;
190 }
191 }
192 break;
193 case SUSPENDED:
194 // Only checking high priority queue
195 return (! getQueue(MsgPriority.HIGH).isEmpty());
196 default:
197 log.error("Unexpected QueueState : ", state);
198 return false;
199 }
200
201 return false;
202 }
203
204 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
205 return raw_queues.get(priority.ordinal());
206 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700207 }
208
209 /**
210 * BarrierInfo holds information to specify barrier message sent to switch.
211 * @author Naoki
212 */
213 private static class BarrierInfo {
214 final long dpid;
215 final int xid;
216
217 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
218 return new BarrierInfo(sw.getId(), req.getXid());
219 }
220
221 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
222 return new BarrierInfo(sw.getId(), rpy.getXid());
223 }
224
225 private BarrierInfo(long dpid, int xid) {
226 this.dpid = dpid;
227 this.xid = xid;
228 }
229
230 // Auto generated code by Eclipse
231 @Override
232 public int hashCode() {
233 final int prime = 31;
234 int result = 1;
235 result = prime * result + (int) (dpid ^ (dpid >>> 32));
236 result = prime * result + xid;
237 return result;
238 }
239
240 @Override
241 public boolean equals(Object obj) {
242 if (this == obj)
243 return true;
244 if (obj == null)
245 return false;
246 if (getClass() != obj.getClass())
247 return false;
248
249 BarrierInfo other = (BarrierInfo) obj;
250 return (this.dpid == other.dpid) && (this.xid == other.xid);
251 }
252
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800253
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700254 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800255
Naoki Shiotac1601d32013-11-20 10:47:34 -0800256 private OFMessageDamper messageDamper = null;
257 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700258
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800259 private FloodlightContext context = null;
260 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800261
262 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800263 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700264 // Map from (DPID and transaction ID) to Future objects.
265 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
266 = new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800267
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700268 private int number_thread;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800269
Naoki Shiota8739faa2013-11-18 17:00:25 -0800270 /**
271 * Main thread that reads messages from queues and sends them to switches.
272 * @author Naoki Shiota
273 *
274 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800275 private class FlowPusherThread extends Thread {
Naoki Shiota05334692014-03-18 16:06:36 -0700276 private Map<IOFSwitch,SwitchQueue> assignedQueues
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700277 = new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800278
Naoki Shiota05334692014-03-18 16:06:36 -0700279 final Lock queuingLock = new ReentrantLock();
280 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800281
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700282 @Override
283 public void run() {
Yuta HIGUCHI61509a42013-12-17 10:41:04 -0800284 this.setName("FlowPusherThread " + this.getId() );
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700285 while (true) {
Naoki Shiota05334692014-03-18 16:06:36 -0700286 while (! queuesHasMessageToSend()) {
287 queuingLock.lock();
288
289 try {
290 // wait for message pushed to queue
291 messagePushed.await();
292 } catch (InterruptedException e) {
293 // Interrupted to be shut down (not an error)
294 log.debug("FlowPusherThread is interrupted");
295 return;
296 } finally {
297 queuingLock.unlock();
298 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800299 }
300
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700301 // for safety of concurrent access, copy set of key objects
Naoki Shiota05334692014-03-18 16:06:36 -0700302 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
303 for (IOFSwitch sw : assignedQueues.keySet()) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700304 keys.add(sw);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800305 }
306
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800307 for (IOFSwitch sw : keys) {
Naoki Shiota05334692014-03-18 16:06:36 -0700308 SwitchQueue queue = assignedQueues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800309
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700310 if (sw == null || queue == null) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700311 continue;
312 }
313
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800314 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800315 processQueue(sw, queue, MAX_MESSAGE_SEND);
Naoki Shiota05334692014-03-18 16:06:36 -0700316 if (queue.toBeDeleted && ! queue.hasMessageToSend()) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800317 // remove queue if flagged to be.
Naoki Shiota05334692014-03-18 16:06:36 -0700318 assignedQueues.remove(sw);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700319 }
320 }
321 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700322 }
323 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800324
325 /**
326 * Read messages from queue and send them to the switch.
327 * If number of messages excess the limit, stop sending messages.
328 * @param sw Switch to which messages will be sent.
329 * @param queue Queue of messages.
330 * @param max_msg Limitation of number of messages to be sent. If set to 0,
331 * all messages in queue will be sent.
332 */
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700333 private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800334 // check sending rate and determine it to be sent or not
335 long current_time = System.currentTimeMillis();
336 long size = 0;
337
338 if (queue.isSendable(current_time)) {
339 int i = 0;
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700340 while (queue.hasMessageToSend()) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800341 // Number of messages excess the limit
342 if (0 < max_msg && max_msg <= i) {
343 break;
344 }
345 ++i;
346
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700347 SwitchQueueEntry queueEntry;
348 synchronized (queue) {
349 queueEntry = queue.poll();
350 }
351
352 OFMessage msg = queueEntry.getOFMessage();
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800353 try {
354 messageDamper.write(sw, msg, context);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700355 if (log.isTraceEnabled()) {
356 log.trace("Pusher sends message : {}", msg);
357 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800358 size += msg.getLength();
359 } catch (IOException e) {
360 e.printStackTrace();
361 log.error("Exception in sending message ({}) : {}", msg, e);
362 }
363 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700364
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800365 sw.flush();
366 queue.logSentData(current_time, size);
367 }
368 }
Naoki Shiota05334692014-03-18 16:06:36 -0700369
370 private boolean queuesHasMessageToSend() {
371 for (SwitchQueue queue : assignedQueues.values()) {
372 if (queue.hasMessageToSend()) {
373 return true;
374 }
375 }
376
377 return false;
378 }
379
380 private void notifyMessagePushed() {
381 queuingLock.lock();
382 try {
383 messagePushed.signal();
384 } finally {
385 queuingLock.unlock();
386 }
387 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700388 }
389
Naoki Shiotac1601d32013-11-20 10:47:34 -0800390 /**
391 * Initialize object with one thread.
392 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800393 public FlowPusher() {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700394 number_thread = DEFAULT_NUMBER_THREAD;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800395 }
396
Naoki Shiotac1601d32013-11-20 10:47:34 -0800397 /**
398 * Initialize object with threads of given number.
399 * @param number_thread Number of threads to handle messages.
400 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800401 public FlowPusher(int number_thread) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700402 if (number_thread > 0) {
403 this.number_thread = number_thread;
404 } else {
405 this.number_thread = DEFAULT_NUMBER_THREAD;
406 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800407 }
408
Naoki Shiotac1601d32013-11-20 10:47:34 -0800409 /**
410 * Set parameters needed for sending messages.
411 * @param context FloodlightContext used for sending messages.
412 * If null, FlowPusher uses default context.
413 * @param modContext FloodlightModuleContext used for acquiring
414 * ThreadPoolService and registering MessageListener.
415 * @param factory Factory object to create OFMessage objects.
416 * @param damper Message damper used for sending messages.
417 * If null, FlowPusher creates its own damper object.
418 */
419 public void init(FloodlightContext context,
420 FloodlightModuleContext modContext,
421 BasicFactory factory,
422 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700423 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800424 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800425 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
Naoki Shiota05334692014-03-18 16:06:36 -0700426 IFloodlightProviderService flservice
427 = modContext.getServiceImpl(IFloodlightProviderService.class);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800428 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800429
430 if (damper != null) {
431 messageDamper = damper;
432 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800433 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800434 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
435 EnumSet.of(OFType.FLOW_MOD),
436 OFMESSAGE_DAMPER_TIMEOUT);
437 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700438 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800439
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800440 /**
441 * Begin processing queue.
442 */
443 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800444 if (factory == null) {
445 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800446 return;
447 }
448
Naoki Shiota81dbe302013-11-21 15:35:38 -0800449 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800450 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800451 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800452
Naoki Shiota81dbe302013-11-21 15:35:38 -0800453 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800454 thread.start();
455 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700456 }
457
Brian O'Connor8c166a72013-11-14 18:41:48 -0800458 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800459 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800460 SwitchQueue queue = getQueue(sw);
461
462 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700463 // create queue in case suspend is called before first message addition
464 queue = createQueueImpl(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800465 }
466
467 synchronized (queue) {
468 if (queue.state == QueueState.READY) {
469 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800470 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800471 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800472 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800473 }
474 }
475
Brian O'Connor8c166a72013-11-14 18:41:48 -0800476 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800477 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800478 SwitchQueue queue = getQueue(sw);
479
480 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700481 log.error("No queue is attached to DPID : {}", sw.getId());
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800482 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800483 }
484
485 synchronized (queue) {
486 if (queue.state == QueueState.SUSPENDED) {
487 queue.state = QueueState.READY;
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800488
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700489 // Free the latch if queue has any messages
490 FlowPusherThread thread = getProcessingThread(sw);
Naoki Shiota05334692014-03-18 16:06:36 -0700491 if (queue.hasMessageToSend()) {
492 thread.notifyMessagePushed();
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800493 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800494 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800495 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800496 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800497 }
498 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800499
Brian O'Connor8c166a72013-11-14 18:41:48 -0800500 @Override
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700501 public QueueState getState(IOFSwitch sw) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800502 SwitchQueue queue = getQueue(sw);
503
504 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700505 return QueueState.UNKNOWN;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800506 }
507
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700508 return queue.state;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800509 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800510
511 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800512 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800513 */
514 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800515 if (threadMap == null) {
516 return;
517 }
518
Naoki Shiota81dbe302013-11-21 15:35:38 -0800519 for (FlowPusherThread t : threadMap.values()) {
520 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800521 }
522 }
523
Naoki Shiotae3199732013-11-25 16:14:43 -0800524 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800525 public void setRate(IOFSwitch sw, long rate) {
526 SwitchQueue queue = getQueue(sw);
527 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700528 queue = createQueueImpl(sw);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800529 }
530
531 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800532 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700533 synchronized (queue) {
534 queue.max_rate = rate;
535 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700536 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700537 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800538
539 @Override
540 public boolean createQueue(IOFSwitch sw) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700541 SwitchQueue queue = createQueueImpl(sw);
542
543 return (queue != null);
544 }
545
546 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800547 SwitchQueue queue = getQueue(sw);
548 if (queue != null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700549 return queue;
Naoki Shiotae3199732013-11-25 16:14:43 -0800550 }
551
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700552 FlowPusherThread proc = getProcessingThread(sw);
Naoki Shiotae3199732013-11-25 16:14:43 -0800553 queue = new SwitchQueue();
554 queue.state = QueueState.READY;
Naoki Shiota05334692014-03-18 16:06:36 -0700555 proc.assignedQueues.put(sw, queue);
Naoki Shiotae3199732013-11-25 16:14:43 -0800556
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700557 return queue;
Naoki Shiotae3199732013-11-25 16:14:43 -0800558 }
559
560 @Override
561 public boolean deleteQueue(IOFSwitch sw) {
562 return deleteQueue(sw, false);
563 }
564
565 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800566 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700567 FlowPusherThread proc = getProcessingThread(sw);
Naoki Shiotae3199732013-11-25 16:14:43 -0800568
Naoki Shiotab485d412013-11-26 12:04:19 -0800569 if (forceStop) {
Naoki Shiota05334692014-03-18 16:06:36 -0700570 SwitchQueue queue = proc.assignedQueues.remove(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700571 if (queue == null) {
572 return false;
Naoki Shiotae3199732013-11-25 16:14:43 -0800573 }
574 return true;
575 } else {
576 SwitchQueue queue = getQueue(sw);
577 if (queue == null) {
578 return false;
579 }
580 synchronized (queue) {
581 queue.toBeDeleted = true;
582 }
583 return true;
584 }
585 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700586
Brian O'Connor8c166a72013-11-14 18:41:48 -0800587 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800588 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700589 return add(sw, msg, MsgPriority.NORMAL);
590 }
591
592 @Override
593 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
594 return addMessageImpl(sw, msg, priority);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800595 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700596
Brian O'Connor8c166a72013-11-14 18:41:48 -0800597 @Override
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800598 public void pushFlowEntries(
599 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700600 pushFlowEntries(entries, MsgPriority.NORMAL);
601 }
602
603 @Override
604 public void pushFlowEntries(
605 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800606
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800607 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700608 add(entry.first, entry.second, priority);
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800609 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800610 }
611
612 @Override
613 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700614 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
615 }
616
617 @Override
618 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800619 Collection<Pair<IOFSwitch, FlowEntry>> entries =
620 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
621
622 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700623 pushFlowEntries(entries, priority);
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800624 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700625
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800626 /**
627 * Create a message from FlowEntry and add it to the queue of the switch.
628 * @param sw Switch to which message is pushed.
629 * @param flowEntry FlowEntry object used for creating message.
630 * @return true if message is successfully added to a queue.
631 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700632 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800633 //
634 // Create the OpenFlow Flow Modification Entry to push
635 //
636 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
637 long cookie = flowEntry.flowEntryId().value();
638
639 short flowModCommand = OFFlowMod.OFPFC_ADD;
640 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
641 flowModCommand = OFFlowMod.OFPFC_ADD;
642 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
643 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
644 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
645 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
646 } else {
647 // Unknown user state. Ignore the entry
648 log.debug(
649 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800650 flowEntry.flowEntryId(),
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800651 flowEntry.flowEntryUserState());
652 return false;
653 }
654
655 //
656 // Fetch the match conditions.
657 //
658 // NOTE: The Flow matching conditions common for all Flow Entries are
659 // used ONLY if a Flow Entry does NOT have the corresponding matching
660 // condition set.
661 //
662 OFMatch match = new OFMatch();
663 match.setWildcards(OFMatch.OFPFW_ALL);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800664 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
665
666 // Match the Incoming Port
667 Port matchInPort = flowEntryMatch.inPort();
668 if (matchInPort != null) {
669 match.setInputPort(matchInPort.value());
670 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
671 }
672
673 // Match the Source MAC address
674 MACAddress matchSrcMac = flowEntryMatch.srcMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800675 if (matchSrcMac != null) {
676 match.setDataLayerSource(matchSrcMac.toString());
677 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
678 }
679
680 // Match the Destination MAC address
681 MACAddress matchDstMac = flowEntryMatch.dstMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800682 if (matchDstMac != null) {
683 match.setDataLayerDestination(matchDstMac.toString());
684 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
685 }
686
687 // Match the Ethernet Frame Type
688 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800689 if (matchEthernetFrameType != null) {
690 match.setDataLayerType(matchEthernetFrameType);
691 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
692 }
693
694 // Match the VLAN ID
695 Short matchVlanId = flowEntryMatch.vlanId();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800696 if (matchVlanId != null) {
697 match.setDataLayerVirtualLan(matchVlanId);
698 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
699 }
700
701 // Match the VLAN priority
702 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800703 if (matchVlanPriority != null) {
704 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
705 match.setWildcards(match.getWildcards()
706 & ~OFMatch.OFPFW_DL_VLAN_PCP);
707 }
708
709 // Match the Source IPv4 Network prefix
710 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800711 if (matchSrcIPv4Net != null) {
712 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
713 }
714
715 // Natch the Destination IPv4 Network prefix
716 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800717 if (matchDstIPv4Net != null) {
718 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
719 }
720
721 // Match the IP protocol
722 Byte matchIpProto = flowEntryMatch.ipProto();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800723 if (matchIpProto != null) {
724 match.setNetworkProtocol(matchIpProto);
725 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
726 }
727
728 // Match the IP ToS (DSCP field, 6 bits)
729 Byte matchIpToS = flowEntryMatch.ipToS();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800730 if (matchIpToS != null) {
731 match.setNetworkTypeOfService(matchIpToS);
732 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
733 }
734
735 // Match the Source TCP/UDP port
736 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800737 if (matchSrcTcpUdpPort != null) {
738 match.setTransportSource(matchSrcTcpUdpPort);
739 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
740 }
741
742 // Match the Destination TCP/UDP port
743 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800744 if (matchDstTcpUdpPort != null) {
745 match.setTransportDestination(matchDstTcpUdpPort);
746 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
747 }
748
749 //
750 // Fetch the actions
751 //
752 Short actionOutputPort = null;
753 List<OFAction> openFlowActions = new ArrayList<OFAction>();
754 int actionsLen = 0;
755 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
756 //
757 for (FlowEntryAction action : flowEntryActions.actions()) {
758 ActionOutput actionOutput = action.actionOutput();
759 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
760 ActionSetVlanPriority actionSetVlanPriority = action
761 .actionSetVlanPriority();
762 ActionStripVlan actionStripVlan = action.actionStripVlan();
763 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
764 .actionSetEthernetSrcAddr();
765 ActionSetEthernetAddr actionSetEthernetDstAddr = action
766 .actionSetEthernetDstAddr();
767 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
768 .actionSetIPv4SrcAddr();
769 ActionSetIPv4Addr actionSetIPv4DstAddr = action
770 .actionSetIPv4DstAddr();
771 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
772 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
773 .actionSetTcpUdpSrcPort();
774 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
775 .actionSetTcpUdpDstPort();
776 ActionEnqueue actionEnqueue = action.actionEnqueue();
777
778 if (actionOutput != null) {
779 actionOutputPort = actionOutput.port().value();
780 // XXX: The max length is hard-coded for now
781 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
782 .value(), (short) 0xffff);
783 openFlowActions.add(ofa);
784 actionsLen += ofa.getLength();
785 }
786
787 if (actionSetVlanId != null) {
788 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
789 actionSetVlanId.vlanId());
790 openFlowActions.add(ofa);
791 actionsLen += ofa.getLength();
792 }
793
794 if (actionSetVlanPriority != null) {
795 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
796 actionSetVlanPriority.vlanPriority());
797 openFlowActions.add(ofa);
798 actionsLen += ofa.getLength();
799 }
800
801 if (actionStripVlan != null) {
802 if (actionStripVlan.stripVlan() == true) {
803 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
804 openFlowActions.add(ofa);
805 actionsLen += ofa.getLength();
806 }
807 }
808
809 if (actionSetEthernetSrcAddr != null) {
810 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
811 actionSetEthernetSrcAddr.addr().toBytes());
812 openFlowActions.add(ofa);
813 actionsLen += ofa.getLength();
814 }
815
816 if (actionSetEthernetDstAddr != null) {
817 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
818 actionSetEthernetDstAddr.addr().toBytes());
819 openFlowActions.add(ofa);
820 actionsLen += ofa.getLength();
821 }
822
823 if (actionSetIPv4SrcAddr != null) {
824 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
825 actionSetIPv4SrcAddr.addr().value());
826 openFlowActions.add(ofa);
827 actionsLen += ofa.getLength();
828 }
829
830 if (actionSetIPv4DstAddr != null) {
831 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
832 actionSetIPv4DstAddr.addr().value());
833 openFlowActions.add(ofa);
834 actionsLen += ofa.getLength();
835 }
836
837 if (actionSetIpToS != null) {
838 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
839 actionSetIpToS.ipToS());
840 openFlowActions.add(ofa);
841 actionsLen += ofa.getLength();
842 }
843
844 if (actionSetTcpUdpSrcPort != null) {
845 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
846 actionSetTcpUdpSrcPort.port());
847 openFlowActions.add(ofa);
848 actionsLen += ofa.getLength();
849 }
850
851 if (actionSetTcpUdpDstPort != null) {
852 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
853 actionSetTcpUdpDstPort.port());
854 openFlowActions.add(ofa);
855 actionsLen += ofa.getLength();
856 }
857
858 if (actionEnqueue != null) {
859 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
860 .value(), actionEnqueue.queueId());
861 openFlowActions.add(ofa);
862 actionsLen += ofa.getLength();
863 }
864 }
865
Pavlin Radoslavov1fe06a22013-12-10 14:12:23 -0800866 fm.setIdleTimeout((short)flowEntry.idleTimeout())
867 .setHardTimeout((short)flowEntry.hardTimeout())
Pavlin Radoslavovafbf1032014-02-04 10:37:52 -0800868 .setPriority((short)flowEntry.priority())
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800869 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
870 .setCommand(flowModCommand).setMatch(match)
871 .setActions(openFlowActions)
872 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
873 fm.setOutPort(OFPort.OFPP_NONE.getValue());
874 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
875 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
876 if (actionOutputPort != null)
877 fm.setOutPort(actionOutputPort);
878 }
879
880 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800881 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
882 // permanent.
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800883 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800884 if ((flowEntry.idleTimeout() != 0) ||
885 (flowEntry.hardTimeout() != 0)) {
886 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
887 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800888
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700889 if (log.isTraceEnabled()) {
890 log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
891 , flowEntry.flowEntryUserState()
892 , sw.getStringId()
893 , flowEntry.flowEntryId()
894 , matchSrcMac
895 , matchDstMac
896 , matchInPort
897 , actionOutputPort
898 );
899 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800900
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700901 return addMessageImpl(sw, fm, priority);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700902 }
903
904 /**
905 * Add message to queue
906 * @param sw
907 * @param msg
908 * @param flowEntryId
909 * @return
910 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700911 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
Naoki Shiota05334692014-03-18 16:06:36 -0700912 FlowPusherThread thread = getProcessingThread(sw);
913
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700914 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700915
916 // create queue at first addition of message
917 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700918 queue = createQueueImpl(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700919 }
920
921 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
Naoki Shiota05334692014-03-18 16:06:36 -0700922
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700923 synchronized (queue) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700924 queue.add(entry,priority);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700925 if (log.isTraceEnabled()) {
926 log.trace("Message is pushed : {}", entry.getOFMessage());
927 }
928 }
929
Naoki Shiota05334692014-03-18 16:06:36 -0700930 thread.notifyMessagePushed();
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700931
932 return true;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800933 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800934
935 @Override
936 public OFBarrierReply barrier(IOFSwitch sw) {
937 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
938 if (future == null) {
939 return null;
940 }
941
942 try {
943 return future.get();
944 } catch (InterruptedException e) {
945 e.printStackTrace();
946 log.error("InterruptedException: {}", e);
947 return null;
948 } catch (ExecutionException e) {
949 e.printStackTrace();
950 log.error("ExecutionException: {}", e);
951 return null;
952 }
953 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800954
Naoki Shiotac1601d32013-11-20 10:47:34 -0800955 @Override
956 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
957 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -0800958
959 if (sw == null) {
960 return null;
961 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800962
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700963 OFBarrierRequest msg = createBarrierRequest(sw);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800964
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700965 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
966 barrierFutures.put(BarrierInfo.create(sw,msg), future);
967
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700968 addMessageImpl(sw, msg, MsgPriority.NORMAL);
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800969
Naoki Shiotac1601d32013-11-20 10:47:34 -0800970 return future;
971 }
972
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700973 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
974 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
975 msg.setXid(sw.getNextTransactionId());
976
977 return msg;
978 }
979
Naoki Shiotae3199732013-11-25 16:14:43 -0800980 /**
981 * Get a queue attached to a switch.
982 * @param sw Switch object
983 * @return Queue object
984 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800985 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800986 if (sw == null) {
987 return null;
988 }
989
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700990 FlowPusherThread th = getProcessingThread(sw);
991 if (th == null) {
992 return null;
993 }
994
Naoki Shiota05334692014-03-18 16:06:36 -0700995 return th.assignedQueues.get(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800996 }
997
Naoki Shiotae3199732013-11-25 16:14:43 -0800998 /**
999 * Get a hash value correspondent to a switch.
1000 * @param sw Switch object
1001 * @return Hash value
1002 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001003 protected long getHash(IOFSwitch sw) {
1004 // This code assumes DPID is sequentially assigned.
1005 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001006 return sw.getId() % number_thread;
1007 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001008
1009 /**
1010 * Get a Thread object which processes the queue attached to a switch.
1011 * @param sw Switch object
1012 * @return Thread object
1013 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -07001014 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001015 long hash = getHash(sw);
1016
1017 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001018 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001019
Naoki Shiotac1601d32013-11-20 10:47:34 -08001020 @Override
1021 public String getName() {
1022 return "flowpusher";
1023 }
1024
1025 @Override
1026 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1027 return false;
1028 }
1029
1030 @Override
1031 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1032 return false;
1033 }
1034
1035 @Override
1036 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001037 if (log.isTraceEnabled()) {
1038 log.trace("Received BARRIER_REPLY from : {}", sw.getId());
1039 }
1040
1041 if (msg.getType() != OFType.BARRIER_REPLY) {
1042 log.error("Unexpected reply message : {}", msg.getType());
Naoki Shiotac1601d32013-11-20 10:47:34 -08001043 return Command.CONTINUE;
1044 }
1045
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001046 OFBarrierReply reply = (OFBarrierReply) msg;
1047 BarrierInfo info = BarrierInfo.create(sw,reply);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001048
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001049 // Deliver future if exists
1050 OFBarrierReplyFuture future = barrierFutures.get(info);
1051 if (future != null) {
1052 future.deliverFuture(sw, msg);
1053 barrierFutures.remove(info);
1054 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001055
1056 return Command.CONTINUE;
1057 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001058}