blob: d3476c7240231ff0800fbf49d8ac0f14e5339af1 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080014import java.util.Set;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070015import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080016import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080017import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070018
Naoki Shiota7d0cf272013-11-05 10:18:12 -080019import org.openflow.protocol.*;
20import org.openflow.protocol.action.*;
21import org.openflow.protocol.factory.BasicFactory;
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070024
25import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080026import net.floodlightcontroller.core.IFloodlightProviderService;
27import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070028import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080029import net.floodlightcontroller.core.internal.OFMessageFuture;
30import net.floodlightcontroller.core.module.FloodlightModuleContext;
31import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080032import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080033import net.floodlightcontroller.util.OFMessageDamper;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080034import net.onrc.onos.ofcontroller.util.FlowEntryAction;
35import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080036import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080037import net.onrc.onos.ofcontroller.util.FlowEntryActions;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080038import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
39import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080040import net.onrc.onos.ofcontroller.util.IPv4Net;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080041import net.onrc.onos.ofcontroller.util.Pair;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080042import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070043
44/**
Naoki Shiotab485d412013-11-26 12:04:19 -080045 * FlowPusher is a implementation of FlowPusherService.
46 * FlowPusher assigns one message queue instance for each one switch.
47 * Number of message processing threads is configurable by constructor, and
48 * one thread can handle multiple message queues. Each queue will be assigned to
49 * a thread according to hash function defined by getHash().
50 * Each processing thread reads messages from queues and sends it to switches
51 * in round-robin. Processing thread also calculates rate of sending to suppress
52 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070053 * @author Naoki Shiota
54 *
55 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080056public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080057 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070058 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080059
60 // NOTE: Below are moved from FlowManager.
61 // TODO: Values copied from elsewhere (class LearningSwitch).
62 // The local copy should go away!
63 //
64 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
65 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080066
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080067 // Number of messages sent to switch at once
68 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080069
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070070 private static class SwitchQueueEntry {
71 OFMessage msg;
72
73 public SwitchQueueEntry(OFMessage msg) {
74 this.msg = msg;
75 }
76
77 public OFMessage getOFMessage() {
78 return msg;
79 }
80 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070081
Naoki Shiotac1601d32013-11-20 10:47:34 -080082 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080083 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080084 * This consists of queue itself and variables used for limiting sending rate.
85 * @author Naoki Shiota
86 *
87 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -070088 private class SwitchQueue {
89 List<Queue<SwitchQueueEntry>> raw_queues;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080090 QueueState state;
91
Naoki Shiotae3199732013-11-25 16:14:43 -080092 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080093 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070094 long last_sent_time = 0;
95 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080096
Naoki Shiotae3199732013-11-25 16:14:43 -080097 // "To be deleted" flag
98 boolean toBeDeleted = false;
99
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700100 SwitchQueue() {
101 raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
102 MsgPriority.values().length);
103 for (int i = 0; i < MsgPriority.values().length; ++i) {
104 raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
105 }
106
107 state = QueueState.READY;
108 }
109
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800110 /**
111 * Check if sending rate is within the rate
112 * @param current Current time
113 * @return true if within the rate
114 */
115 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800116 if (max_rate == 0) {
117 // no limitation
118 return true;
119 }
120
Naoki Shiota81dbe302013-11-21 15:35:38 -0800121 if (current == last_sent_time) {
122 return false;
123 }
124
Naoki Shiotac1601d32013-11-20 10:47:34 -0800125 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800126 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800127 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800128 }
129
Naoki Shiota81dbe302013-11-21 15:35:38 -0800130 /**
131 * Log time and size of last sent data.
132 * @param current Time to be sent.
133 * @param size Size of sent data (in bytes).
134 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800135 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800136 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800137 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800138 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700139
140 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
141 Queue<SwitchQueueEntry> queue = getQueue(priority);
142 if (queue == null) {
143 log.error("Unexpected priority : ", priority);
144 return false;
145 }
146 return queue.add(entry);
147 }
148
149 /**
150 * Poll single appropriate entry object according to QueueState.
151 * @return Entry object.
152 */
153 SwitchQueueEntry poll() {
154 switch (state) {
155 case READY:
156 {
157 for (int i = 0; i < raw_queues.size(); ++i) {
158 SwitchQueueEntry entry = raw_queues.get(i).poll();
159 if (entry != null) {
160 return entry;
161 }
162 }
163
164 return null;
165 }
166 case SUSPENDED:
167 {
168 // Only polling from high priority queue
169 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
170 return entry;
171 }
172 default:
173 log.error("Unexpected QueueState : ", state);
174 return null;
175 }
176 }
177
178 /**
179 * Check if this object has any messages in the queues to be sent
180 * @return True if there are some messages to be sent.
181 */
182 boolean hasMessageToSend() {
183 switch (state) {
184 case READY:
185 for (Queue<SwitchQueueEntry> queue : raw_queues) {
186 if (! queue.isEmpty()) {
187 return true;
188 }
189 }
190 break;
191 case SUSPENDED:
192 // Only checking high priority queue
193 return (! getQueue(MsgPriority.HIGH).isEmpty());
194 default:
195 log.error("Unexpected QueueState : ", state);
196 return false;
197 }
198
199 return false;
200 }
201
202 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
203 return raw_queues.get(priority.ordinal());
204 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700205 }
206
207 /**
208 * BarrierInfo holds information to specify barrier message sent to switch.
209 * @author Naoki
210 */
211 private static class BarrierInfo {
212 final long dpid;
213 final int xid;
214
215 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
216 return new BarrierInfo(sw.getId(), req.getXid());
217 }
218
219 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
220 return new BarrierInfo(sw.getId(), rpy.getXid());
221 }
222
223 private BarrierInfo(long dpid, int xid) {
224 this.dpid = dpid;
225 this.xid = xid;
226 }
227
228 // Auto generated code by Eclipse
229 @Override
230 public int hashCode() {
231 final int prime = 31;
232 int result = 1;
233 result = prime * result + (int) (dpid ^ (dpid >>> 32));
234 result = prime * result + xid;
235 return result;
236 }
237
238 @Override
239 public boolean equals(Object obj) {
240 if (this == obj)
241 return true;
242 if (obj == null)
243 return false;
244 if (getClass() != obj.getClass())
245 return false;
246
247 BarrierInfo other = (BarrierInfo) obj;
248 return (this.dpid == other.dpid) && (this.xid == other.xid);
249 }
250
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800251
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700252 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800253
Naoki Shiotac1601d32013-11-20 10:47:34 -0800254 private OFMessageDamper messageDamper = null;
255 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700256
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800257 private FloodlightContext context = null;
258 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800259
260 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800261 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700262 // Map from (DPID and transaction ID) to Future objects.
263 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
264 = new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800265
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700266 private int number_thread;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800267
Naoki Shiota8739faa2013-11-18 17:00:25 -0800268 /**
269 * Main thread that reads messages from queues and sends them to switches.
270 * @author Naoki Shiota
271 *
272 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800273 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800274 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700275 = new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800276
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800277 // reusable latch used for waiting for arrival of message
Naoki Shiota81dbe302013-11-21 15:35:38 -0800278 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800279
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700280 @Override
281 public void run() {
Yuta HIGUCHI61509a42013-12-17 10:41:04 -0800282 this.setName("FlowPusherThread " + this.getId() );
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700283 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800284 try {
285 // wait for message pushed to queue
286 mutex.acquire();
287 } catch (InterruptedException e) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700288 // Interrupted to be shut down (not an error)
Naoki Shiota81dbe302013-11-21 15:35:38 -0800289 log.debug("FlowPusherThread is interrupted");
290 return;
291 }
292
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700293 // for safety of concurrent access, copy set of key objects
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800294 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700295 for (IOFSwitch sw : queues.keySet()) {
296 keys.add(sw);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800297 }
298
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800299 for (IOFSwitch sw : keys) {
300 SwitchQueue queue = queues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800301
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700302 if (sw == null || queue == null) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700303 continue;
304 }
305
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800306 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800307 processQueue(sw, queue, MAX_MESSAGE_SEND);
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700308 if (! queue.hasMessageToSend()) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800309 // remove queue if flagged to be.
310 if (queue.toBeDeleted) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700311 queues.remove(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800312 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800313 } else {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700314 // Free the latch if message remains in queue
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800315 if (mutex.availablePermits() == 0) {
316 mutex.release();
Naoki Shiota81dbe302013-11-21 15:35:38 -0800317 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700318 }
319 }
320 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700321 }
322 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800323
324 /**
325 * Read messages from queue and send them to the switch.
326 * If number of messages excess the limit, stop sending messages.
327 * @param sw Switch to which messages will be sent.
328 * @param queue Queue of messages.
329 * @param max_msg Limitation of number of messages to be sent. If set to 0,
330 * all messages in queue will be sent.
331 */
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700332 private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800333 // check sending rate and determine it to be sent or not
334 long current_time = System.currentTimeMillis();
335 long size = 0;
336
337 if (queue.isSendable(current_time)) {
338 int i = 0;
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700339 while (queue.hasMessageToSend()) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800340 // Number of messages excess the limit
341 if (0 < max_msg && max_msg <= i) {
342 break;
343 }
344 ++i;
345
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700346 SwitchQueueEntry queueEntry;
347 synchronized (queue) {
348 queueEntry = queue.poll();
349 }
350
351 OFMessage msg = queueEntry.getOFMessage();
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800352 try {
353 messageDamper.write(sw, msg, context);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700354 if (log.isTraceEnabled()) {
355 log.trace("Pusher sends message : {}", msg);
356 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800357 size += msg.getLength();
358 } catch (IOException e) {
359 e.printStackTrace();
360 log.error("Exception in sending message ({}) : {}", msg, e);
361 }
362 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700363
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800364 sw.flush();
365 queue.logSentData(current_time, size);
366 }
367 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700368 }
369
Naoki Shiotac1601d32013-11-20 10:47:34 -0800370 /**
371 * Initialize object with one thread.
372 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800373 public FlowPusher() {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700374 number_thread = DEFAULT_NUMBER_THREAD;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800375 }
376
Naoki Shiotac1601d32013-11-20 10:47:34 -0800377 /**
378 * Initialize object with threads of given number.
379 * @param number_thread Number of threads to handle messages.
380 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800381 public FlowPusher(int number_thread) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700382 if (number_thread > 0) {
383 this.number_thread = number_thread;
384 } else {
385 this.number_thread = DEFAULT_NUMBER_THREAD;
386 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800387 }
388
Naoki Shiotac1601d32013-11-20 10:47:34 -0800389 /**
390 * Set parameters needed for sending messages.
391 * @param context FloodlightContext used for sending messages.
392 * If null, FlowPusher uses default context.
393 * @param modContext FloodlightModuleContext used for acquiring
394 * ThreadPoolService and registering MessageListener.
395 * @param factory Factory object to create OFMessage objects.
396 * @param damper Message damper used for sending messages.
397 * If null, FlowPusher creates its own damper object.
398 */
399 public void init(FloodlightContext context,
400 FloodlightModuleContext modContext,
401 BasicFactory factory,
402 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700403 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800404 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800405 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
406 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
407 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800408
409 if (damper != null) {
410 messageDamper = damper;
411 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800412 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800413 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
414 EnumSet.of(OFType.FLOW_MOD),
415 OFMESSAGE_DAMPER_TIMEOUT);
416 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700417 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800418
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800419 /**
420 * Begin processing queue.
421 */
422 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800423 if (factory == null) {
424 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800425 return;
426 }
427
Naoki Shiota81dbe302013-11-21 15:35:38 -0800428 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800429 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800430 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800431
Naoki Shiota81dbe302013-11-21 15:35:38 -0800432 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800433 thread.start();
434 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700435 }
436
Brian O'Connor8c166a72013-11-14 18:41:48 -0800437 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800438 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800439 SwitchQueue queue = getQueue(sw);
440
441 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700442 // create queue in case suspend is called before first message addition
443 queue = createQueueImpl(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800444 }
445
446 synchronized (queue) {
447 if (queue.state == QueueState.READY) {
448 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800449 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800450 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800451 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800452 }
453 }
454
Brian O'Connor8c166a72013-11-14 18:41:48 -0800455 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800456 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800457 SwitchQueue queue = getQueue(sw);
458
459 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700460 log.error("No queue is attached to DPID : {}", sw.getId());
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800461 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800462 }
463
464 synchronized (queue) {
465 if (queue.state == QueueState.SUSPENDED) {
466 queue.state = QueueState.READY;
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800467
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700468 // Free the latch if queue has any messages
469 FlowPusherThread thread = getProcessingThread(sw);
470 if (queue.hasMessageToSend() &&
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800471 thread.mutex.availablePermits() == 0) {
472 thread.mutex.release();
473 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800474 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800475 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800476 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800477 }
478 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800479
Brian O'Connor8c166a72013-11-14 18:41:48 -0800480 @Override
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700481 public QueueState getState(IOFSwitch sw) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800482 SwitchQueue queue = getQueue(sw);
483
484 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700485 return QueueState.UNKNOWN;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800486 }
487
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700488 return queue.state;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800489 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800490
491 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800492 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800493 */
494 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800495 if (threadMap == null) {
496 return;
497 }
498
Naoki Shiota81dbe302013-11-21 15:35:38 -0800499 for (FlowPusherThread t : threadMap.values()) {
500 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800501 }
502 }
503
Naoki Shiotae3199732013-11-25 16:14:43 -0800504 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800505 public void setRate(IOFSwitch sw, long rate) {
506 SwitchQueue queue = getQueue(sw);
507 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700508 queue = createQueueImpl(sw);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800509 }
510
511 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800512 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700513 synchronized (queue) {
514 queue.max_rate = rate;
515 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700516 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700517 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800518
519 @Override
520 public boolean createQueue(IOFSwitch sw) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700521 SwitchQueue queue = createQueueImpl(sw);
522
523 return (queue != null);
524 }
525
526 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800527 SwitchQueue queue = getQueue(sw);
528 if (queue != null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700529 return queue;
Naoki Shiotae3199732013-11-25 16:14:43 -0800530 }
531
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700532 FlowPusherThread proc = getProcessingThread(sw);
Naoki Shiotae3199732013-11-25 16:14:43 -0800533 queue = new SwitchQueue();
534 queue.state = QueueState.READY;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700535 proc.queues.put(sw, queue);
Naoki Shiotae3199732013-11-25 16:14:43 -0800536
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700537 return queue;
Naoki Shiotae3199732013-11-25 16:14:43 -0800538 }
539
540 @Override
541 public boolean deleteQueue(IOFSwitch sw) {
542 return deleteQueue(sw, false);
543 }
544
545 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800546 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700547 FlowPusherThread proc = getProcessingThread(sw);
Naoki Shiotae3199732013-11-25 16:14:43 -0800548
Naoki Shiotab485d412013-11-26 12:04:19 -0800549 if (forceStop) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700550 SwitchQueue queue = proc.queues.remove(sw);
551 if (queue == null) {
552 return false;
Naoki Shiotae3199732013-11-25 16:14:43 -0800553 }
554 return true;
555 } else {
556 SwitchQueue queue = getQueue(sw);
557 if (queue == null) {
558 return false;
559 }
560 synchronized (queue) {
561 queue.toBeDeleted = true;
562 }
563 return true;
564 }
565 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700566
Brian O'Connor8c166a72013-11-14 18:41:48 -0800567 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800568 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700569 return add(sw, msg, MsgPriority.NORMAL);
570 }
571
572 @Override
573 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
574 return addMessageImpl(sw, msg, priority);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800575 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700576
Brian O'Connor8c166a72013-11-14 18:41:48 -0800577 @Override
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800578 public void pushFlowEntries(
579 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700580 pushFlowEntries(entries, MsgPriority.NORMAL);
581 }
582
583 @Override
584 public void pushFlowEntries(
585 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800586
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800587 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700588 add(entry.first, entry.second, priority);
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800589 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800590 }
591
592 @Override
593 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700594 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
595 }
596
597 @Override
598 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800599 Collection<Pair<IOFSwitch, FlowEntry>> entries =
600 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
601
602 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700603 pushFlowEntries(entries, priority);
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800604 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700605
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800606 /**
607 * Create a message from FlowEntry and add it to the queue of the switch.
608 * @param sw Switch to which message is pushed.
609 * @param flowEntry FlowEntry object used for creating message.
610 * @return true if message is successfully added to a queue.
611 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700612 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800613 //
614 // Create the OpenFlow Flow Modification Entry to push
615 //
616 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
617 long cookie = flowEntry.flowEntryId().value();
618
619 short flowModCommand = OFFlowMod.OFPFC_ADD;
620 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
621 flowModCommand = OFFlowMod.OFPFC_ADD;
622 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
623 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
624 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
625 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
626 } else {
627 // Unknown user state. Ignore the entry
628 log.debug(
629 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800630 flowEntry.flowEntryId(),
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800631 flowEntry.flowEntryUserState());
632 return false;
633 }
634
635 //
636 // Fetch the match conditions.
637 //
638 // NOTE: The Flow matching conditions common for all Flow Entries are
639 // used ONLY if a Flow Entry does NOT have the corresponding matching
640 // condition set.
641 //
642 OFMatch match = new OFMatch();
643 match.setWildcards(OFMatch.OFPFW_ALL);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800644 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
645
646 // Match the Incoming Port
647 Port matchInPort = flowEntryMatch.inPort();
648 if (matchInPort != null) {
649 match.setInputPort(matchInPort.value());
650 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
651 }
652
653 // Match the Source MAC address
654 MACAddress matchSrcMac = flowEntryMatch.srcMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800655 if (matchSrcMac != null) {
656 match.setDataLayerSource(matchSrcMac.toString());
657 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
658 }
659
660 // Match the Destination MAC address
661 MACAddress matchDstMac = flowEntryMatch.dstMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800662 if (matchDstMac != null) {
663 match.setDataLayerDestination(matchDstMac.toString());
664 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
665 }
666
667 // Match the Ethernet Frame Type
668 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800669 if (matchEthernetFrameType != null) {
670 match.setDataLayerType(matchEthernetFrameType);
671 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
672 }
673
674 // Match the VLAN ID
675 Short matchVlanId = flowEntryMatch.vlanId();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800676 if (matchVlanId != null) {
677 match.setDataLayerVirtualLan(matchVlanId);
678 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
679 }
680
681 // Match the VLAN priority
682 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800683 if (matchVlanPriority != null) {
684 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
685 match.setWildcards(match.getWildcards()
686 & ~OFMatch.OFPFW_DL_VLAN_PCP);
687 }
688
689 // Match the Source IPv4 Network prefix
690 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800691 if (matchSrcIPv4Net != null) {
692 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
693 }
694
695 // Natch the Destination IPv4 Network prefix
696 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800697 if (matchDstIPv4Net != null) {
698 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
699 }
700
701 // Match the IP protocol
702 Byte matchIpProto = flowEntryMatch.ipProto();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800703 if (matchIpProto != null) {
704 match.setNetworkProtocol(matchIpProto);
705 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
706 }
707
708 // Match the IP ToS (DSCP field, 6 bits)
709 Byte matchIpToS = flowEntryMatch.ipToS();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800710 if (matchIpToS != null) {
711 match.setNetworkTypeOfService(matchIpToS);
712 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
713 }
714
715 // Match the Source TCP/UDP port
716 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800717 if (matchSrcTcpUdpPort != null) {
718 match.setTransportSource(matchSrcTcpUdpPort);
719 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
720 }
721
722 // Match the Destination TCP/UDP port
723 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800724 if (matchDstTcpUdpPort != null) {
725 match.setTransportDestination(matchDstTcpUdpPort);
726 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
727 }
728
729 //
730 // Fetch the actions
731 //
732 Short actionOutputPort = null;
733 List<OFAction> openFlowActions = new ArrayList<OFAction>();
734 int actionsLen = 0;
735 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
736 //
737 for (FlowEntryAction action : flowEntryActions.actions()) {
738 ActionOutput actionOutput = action.actionOutput();
739 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
740 ActionSetVlanPriority actionSetVlanPriority = action
741 .actionSetVlanPriority();
742 ActionStripVlan actionStripVlan = action.actionStripVlan();
743 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
744 .actionSetEthernetSrcAddr();
745 ActionSetEthernetAddr actionSetEthernetDstAddr = action
746 .actionSetEthernetDstAddr();
747 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
748 .actionSetIPv4SrcAddr();
749 ActionSetIPv4Addr actionSetIPv4DstAddr = action
750 .actionSetIPv4DstAddr();
751 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
752 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
753 .actionSetTcpUdpSrcPort();
754 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
755 .actionSetTcpUdpDstPort();
756 ActionEnqueue actionEnqueue = action.actionEnqueue();
757
758 if (actionOutput != null) {
759 actionOutputPort = actionOutput.port().value();
760 // XXX: The max length is hard-coded for now
761 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
762 .value(), (short) 0xffff);
763 openFlowActions.add(ofa);
764 actionsLen += ofa.getLength();
765 }
766
767 if (actionSetVlanId != null) {
768 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
769 actionSetVlanId.vlanId());
770 openFlowActions.add(ofa);
771 actionsLen += ofa.getLength();
772 }
773
774 if (actionSetVlanPriority != null) {
775 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
776 actionSetVlanPriority.vlanPriority());
777 openFlowActions.add(ofa);
778 actionsLen += ofa.getLength();
779 }
780
781 if (actionStripVlan != null) {
782 if (actionStripVlan.stripVlan() == true) {
783 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
784 openFlowActions.add(ofa);
785 actionsLen += ofa.getLength();
786 }
787 }
788
789 if (actionSetEthernetSrcAddr != null) {
790 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
791 actionSetEthernetSrcAddr.addr().toBytes());
792 openFlowActions.add(ofa);
793 actionsLen += ofa.getLength();
794 }
795
796 if (actionSetEthernetDstAddr != null) {
797 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
798 actionSetEthernetDstAddr.addr().toBytes());
799 openFlowActions.add(ofa);
800 actionsLen += ofa.getLength();
801 }
802
803 if (actionSetIPv4SrcAddr != null) {
804 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
805 actionSetIPv4SrcAddr.addr().value());
806 openFlowActions.add(ofa);
807 actionsLen += ofa.getLength();
808 }
809
810 if (actionSetIPv4DstAddr != null) {
811 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
812 actionSetIPv4DstAddr.addr().value());
813 openFlowActions.add(ofa);
814 actionsLen += ofa.getLength();
815 }
816
817 if (actionSetIpToS != null) {
818 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
819 actionSetIpToS.ipToS());
820 openFlowActions.add(ofa);
821 actionsLen += ofa.getLength();
822 }
823
824 if (actionSetTcpUdpSrcPort != null) {
825 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
826 actionSetTcpUdpSrcPort.port());
827 openFlowActions.add(ofa);
828 actionsLen += ofa.getLength();
829 }
830
831 if (actionSetTcpUdpDstPort != null) {
832 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
833 actionSetTcpUdpDstPort.port());
834 openFlowActions.add(ofa);
835 actionsLen += ofa.getLength();
836 }
837
838 if (actionEnqueue != null) {
839 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
840 .value(), actionEnqueue.queueId());
841 openFlowActions.add(ofa);
842 actionsLen += ofa.getLength();
843 }
844 }
845
Pavlin Radoslavov1fe06a22013-12-10 14:12:23 -0800846 fm.setIdleTimeout((short)flowEntry.idleTimeout())
847 .setHardTimeout((short)flowEntry.hardTimeout())
Pavlin Radoslavovafbf1032014-02-04 10:37:52 -0800848 .setPriority((short)flowEntry.priority())
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800849 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
850 .setCommand(flowModCommand).setMatch(match)
851 .setActions(openFlowActions)
852 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
853 fm.setOutPort(OFPort.OFPP_NONE.getValue());
854 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
855 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
856 if (actionOutputPort != null)
857 fm.setOutPort(actionOutputPort);
858 }
859
860 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800861 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
862 // permanent.
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800863 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800864 if ((flowEntry.idleTimeout() != 0) ||
865 (flowEntry.hardTimeout() != 0)) {
866 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
867 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800868
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700869 if (log.isTraceEnabled()) {
870 log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
871 , flowEntry.flowEntryUserState()
872 , sw.getStringId()
873 , flowEntry.flowEntryId()
874 , matchSrcMac
875 , matchDstMac
876 , matchInPort
877 , actionOutputPort
878 );
879 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800880
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700881 return addMessageImpl(sw, fm, priority);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700882 }
883
884 /**
885 * Add message to queue
886 * @param sw
887 * @param msg
888 * @param flowEntryId
889 * @return
890 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700891 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
892 FlowPusherThread proc = getProcessingThread(sw);
893 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700894
895 // create queue at first addition of message
896 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700897 queue = createQueueImpl(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700898 }
899
900 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
901 synchronized (queue) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700902 queue.add(entry,priority);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700903 if (log.isTraceEnabled()) {
904 log.trace("Message is pushed : {}", entry.getOFMessage());
905 }
906 }
907
908 if (proc.mutex.availablePermits() == 0) {
909 proc.mutex.release();
910 }
911
912 return true;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800913 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800914
915 @Override
916 public OFBarrierReply barrier(IOFSwitch sw) {
917 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
918 if (future == null) {
919 return null;
920 }
921
922 try {
923 return future.get();
924 } catch (InterruptedException e) {
925 e.printStackTrace();
926 log.error("InterruptedException: {}", e);
927 return null;
928 } catch (ExecutionException e) {
929 e.printStackTrace();
930 log.error("ExecutionException: {}", e);
931 return null;
932 }
933 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800934
Naoki Shiotac1601d32013-11-20 10:47:34 -0800935 @Override
936 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
937 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -0800938
939 if (sw == null) {
940 return null;
941 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800942
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700943 OFBarrierRequest msg = createBarrierRequest(sw);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800944
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700945 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
946 barrierFutures.put(BarrierInfo.create(sw,msg), future);
947
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700948 addMessageImpl(sw, msg, MsgPriority.NORMAL);
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800949
Naoki Shiotac1601d32013-11-20 10:47:34 -0800950 return future;
951 }
952
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700953 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
954 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
955 msg.setXid(sw.getNextTransactionId());
956
957 return msg;
958 }
959
Naoki Shiotae3199732013-11-25 16:14:43 -0800960 /**
961 * Get a queue attached to a switch.
962 * @param sw Switch object
963 * @return Queue object
964 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800965 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800966 if (sw == null) {
967 return null;
968 }
969
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700970 FlowPusherThread th = getProcessingThread(sw);
971 if (th == null) {
972 return null;
973 }
974
975 return th.queues.get(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800976 }
977
Naoki Shiotae3199732013-11-25 16:14:43 -0800978 /**
979 * Get a hash value correspondent to a switch.
980 * @param sw Switch object
981 * @return Hash value
982 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800983 protected long getHash(IOFSwitch sw) {
984 // This code assumes DPID is sequentially assigned.
985 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800986 return sw.getId() % number_thread;
987 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800988
989 /**
990 * Get a Thread object which processes the queue attached to a switch.
991 * @param sw Switch object
992 * @return Thread object
993 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700994 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800995 long hash = getHash(sw);
996
997 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800998 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700999
Naoki Shiotac1601d32013-11-20 10:47:34 -08001000 @Override
1001 public String getName() {
1002 return "flowpusher";
1003 }
1004
1005 @Override
1006 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1007 return false;
1008 }
1009
1010 @Override
1011 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1012 return false;
1013 }
1014
1015 @Override
1016 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001017 if (log.isTraceEnabled()) {
1018 log.trace("Received BARRIER_REPLY from : {}", sw.getId());
1019 }
1020
1021 if (msg.getType() != OFType.BARRIER_REPLY) {
1022 log.error("Unexpected reply message : {}", msg.getType());
Naoki Shiotac1601d32013-11-20 10:47:34 -08001023 return Command.CONTINUE;
1024 }
1025
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001026 OFBarrierReply reply = (OFBarrierReply) msg;
1027 BarrierInfo info = BarrierInfo.create(sw,reply);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001028
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001029 // Deliver future if exists
1030 OFBarrierReplyFuture future = barrierFutures.get(info);
1031 if (future != null) {
1032 future.deliverFuture(sw, msg);
1033 barrierFutures.remove(info);
1034 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001035
1036 return Command.CONTINUE;
1037 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001038}