blob: b52bafcd034dc14aacb172f882a12029dad8d5f5 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080014import java.util.Set;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070015import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080016import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070017import java.util.concurrent.locks.Condition;
18import java.util.concurrent.locks.Lock;
19import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
Naoki Shiota7d0cf272013-11-05 10:18:12 -080021import org.openflow.protocol.*;
22import org.openflow.protocol.action.*;
23import org.openflow.protocol.factory.BasicFactory;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070026
27import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080028import net.floodlightcontroller.core.IFloodlightProviderService;
29import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070030import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080031import net.floodlightcontroller.core.internal.OFMessageFuture;
32import net.floodlightcontroller.core.module.FloodlightModuleContext;
33import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080034import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080035import net.floodlightcontroller.util.OFMessageDamper;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080036import net.onrc.onos.ofcontroller.util.FlowEntryAction;
37import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080038import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080039import net.onrc.onos.ofcontroller.util.FlowEntryActions;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080040import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
41import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080042import net.onrc.onos.ofcontroller.util.IPv4Net;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080043import net.onrc.onos.ofcontroller.util.Pair;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080044import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070045
46/**
Naoki Shiotab485d412013-11-26 12:04:19 -080047 * FlowPusher is a implementation of FlowPusherService.
48 * FlowPusher assigns one message queue instance for each one switch.
49 * Number of message processing threads is configurable by constructor, and
50 * one thread can handle multiple message queues. Each queue will be assigned to
51 * a thread according to hash function defined by getHash().
52 * Each processing thread reads messages from queues and sends it to switches
53 * in round-robin. Processing thread also calculates rate of sending to suppress
54 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070055 * @author Naoki Shiota
56 *
57 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080058public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080059 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070060 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080061
Naoki Shiota7d0cf272013-11-05 10:18:12 -080062 // TODO: Values copied from elsewhere (class LearningSwitch).
63 // The local copy should go away!
64 //
65 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
66 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080067
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080068 // Number of messages sent to switch at once
69 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080070
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070071 private static class SwitchQueueEntry {
72 OFMessage msg;
73
74 public SwitchQueueEntry(OFMessage msg) {
75 this.msg = msg;
76 }
77
78 public OFMessage getOFMessage() {
79 return msg;
80 }
81 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070082
Naoki Shiotac1601d32013-11-20 10:47:34 -080083 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080084 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080085 * This consists of queue itself and variables used for limiting sending rate.
86 * @author Naoki Shiota
87 *
88 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -070089 private class SwitchQueue {
90 List<Queue<SwitchQueueEntry>> raw_queues;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080091 QueueState state;
92
Naoki Shiotae3199732013-11-25 16:14:43 -080093 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080094 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070095 long last_sent_time = 0;
96 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080097
Naoki Shiotae3199732013-11-25 16:14:43 -080098 // "To be deleted" flag
99 boolean toBeDeleted = false;
100
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700101 SwitchQueue() {
102 raw_queues = new ArrayList<Queue<SwitchQueueEntry>>(
103 MsgPriority.values().length);
104 for (int i = 0; i < MsgPriority.values().length; ++i) {
105 raw_queues.add(i, new ArrayDeque<SwitchQueueEntry>());
106 }
107
108 state = QueueState.READY;
109 }
110
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800111 /**
112 * Check if sending rate is within the rate
113 * @param current Current time
114 * @return true if within the rate
115 */
116 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800117 if (max_rate == 0) {
118 // no limitation
119 return true;
120 }
121
Naoki Shiota81dbe302013-11-21 15:35:38 -0800122 if (current == last_sent_time) {
123 return false;
124 }
125
Naoki Shiotac1601d32013-11-20 10:47:34 -0800126 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800127 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800128 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800129 }
130
Naoki Shiota81dbe302013-11-21 15:35:38 -0800131 /**
132 * Log time and size of last sent data.
133 * @param current Time to be sent.
134 * @param size Size of sent data (in bytes).
135 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800136 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800137 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800138 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800139 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700140
141 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
142 Queue<SwitchQueueEntry> queue = getQueue(priority);
143 if (queue == null) {
144 log.error("Unexpected priority : ", priority);
145 return false;
146 }
147 return queue.add(entry);
148 }
149
150 /**
151 * Poll single appropriate entry object according to QueueState.
152 * @return Entry object.
153 */
154 SwitchQueueEntry poll() {
155 switch (state) {
156 case READY:
157 {
158 for (int i = 0; i < raw_queues.size(); ++i) {
159 SwitchQueueEntry entry = raw_queues.get(i).poll();
160 if (entry != null) {
161 return entry;
162 }
163 }
164
165 return null;
166 }
167 case SUSPENDED:
168 {
169 // Only polling from high priority queue
170 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
171 return entry;
172 }
173 default:
174 log.error("Unexpected QueueState : ", state);
175 return null;
176 }
177 }
178
179 /**
180 * Check if this object has any messages in the queues to be sent
181 * @return True if there are some messages to be sent.
182 */
183 boolean hasMessageToSend() {
184 switch (state) {
185 case READY:
186 for (Queue<SwitchQueueEntry> queue : raw_queues) {
187 if (! queue.isEmpty()) {
188 return true;
189 }
190 }
191 break;
192 case SUSPENDED:
193 // Only checking high priority queue
194 return (! getQueue(MsgPriority.HIGH).isEmpty());
195 default:
196 log.error("Unexpected QueueState : ", state);
197 return false;
198 }
199
200 return false;
201 }
202
203 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
204 return raw_queues.get(priority.ordinal());
205 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700206 }
207
208 /**
209 * BarrierInfo holds information to specify barrier message sent to switch.
210 * @author Naoki
211 */
212 private static class BarrierInfo {
213 final long dpid;
214 final int xid;
215
216 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
217 return new BarrierInfo(sw.getId(), req.getXid());
218 }
219
220 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
221 return new BarrierInfo(sw.getId(), rpy.getXid());
222 }
223
224 private BarrierInfo(long dpid, int xid) {
225 this.dpid = dpid;
226 this.xid = xid;
227 }
228
229 // Auto generated code by Eclipse
230 @Override
231 public int hashCode() {
232 final int prime = 31;
233 int result = 1;
234 result = prime * result + (int) (dpid ^ (dpid >>> 32));
235 result = prime * result + xid;
236 return result;
237 }
238
239 @Override
240 public boolean equals(Object obj) {
241 if (this == obj)
242 return true;
243 if (obj == null)
244 return false;
245 if (getClass() != obj.getClass())
246 return false;
247
248 BarrierInfo other = (BarrierInfo) obj;
249 return (this.dpid == other.dpid) && (this.xid == other.xid);
250 }
251
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800252
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700253 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800254
Naoki Shiotac1601d32013-11-20 10:47:34 -0800255 private OFMessageDamper messageDamper = null;
256 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700257
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800258 private FloodlightContext context = null;
259 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800260
261 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800262 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700263 // Map from (DPID and transaction ID) to Future objects.
264 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
265 = new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800266
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700267 private int number_thread;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800268
Naoki Shiota8739faa2013-11-18 17:00:25 -0800269 /**
270 * Main thread that reads messages from queues and sends them to switches.
271 * @author Naoki Shiota
272 *
273 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800274 private class FlowPusherThread extends Thread {
Naoki Shiota05334692014-03-18 16:06:36 -0700275 private Map<IOFSwitch,SwitchQueue> assignedQueues
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700276 = new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800277
Naoki Shiota05334692014-03-18 16:06:36 -0700278 final Lock queuingLock = new ReentrantLock();
279 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800280
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700281 @Override
282 public void run() {
Yuta HIGUCHI61509a42013-12-17 10:41:04 -0800283 this.setName("FlowPusherThread " + this.getId() );
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700284 while (true) {
Naoki Shiota05334692014-03-18 16:06:36 -0700285 while (! queuesHasMessageToSend()) {
286 queuingLock.lock();
287
288 try {
289 // wait for message pushed to queue
290 messagePushed.await();
291 } catch (InterruptedException e) {
292 // Interrupted to be shut down (not an error)
293 log.debug("FlowPusherThread is interrupted");
294 return;
295 } finally {
296 queuingLock.unlock();
297 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800298 }
299
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700300 // for safety of concurrent access, copy set of key objects
Naoki Shiota05334692014-03-18 16:06:36 -0700301 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
302 for (IOFSwitch sw : assignedQueues.keySet()) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700303 keys.add(sw);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800304 }
305
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800306 for (IOFSwitch sw : keys) {
Naoki Shiota05334692014-03-18 16:06:36 -0700307 SwitchQueue queue = assignedQueues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800308
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700309 if (sw == null || queue == null) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700310 continue;
311 }
312
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800313 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800314 processQueue(sw, queue, MAX_MESSAGE_SEND);
Naoki Shiota05334692014-03-18 16:06:36 -0700315 if (queue.toBeDeleted && ! queue.hasMessageToSend()) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800316 // remove queue if flagged to be.
Naoki Shiota05334692014-03-18 16:06:36 -0700317 assignedQueues.remove(sw);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700318 }
319 }
320 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700321 }
322 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800323
324 /**
325 * Read messages from queue and send them to the switch.
326 * If number of messages excess the limit, stop sending messages.
327 * @param sw Switch to which messages will be sent.
328 * @param queue Queue of messages.
329 * @param max_msg Limitation of number of messages to be sent. If set to 0,
330 * all messages in queue will be sent.
331 */
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700332 private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800333 // check sending rate and determine it to be sent or not
334 long current_time = System.currentTimeMillis();
335 long size = 0;
336
337 if (queue.isSendable(current_time)) {
338 int i = 0;
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700339 while (queue.hasMessageToSend()) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800340 // Number of messages excess the limit
341 if (0 < max_msg && max_msg <= i) {
342 break;
343 }
344 ++i;
345
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700346 SwitchQueueEntry queueEntry;
347 synchronized (queue) {
348 queueEntry = queue.poll();
349 }
350
351 OFMessage msg = queueEntry.getOFMessage();
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800352 try {
353 messageDamper.write(sw, msg, context);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700354 if (log.isTraceEnabled()) {
355 log.trace("Pusher sends message : {}", msg);
356 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800357 size += msg.getLength();
358 } catch (IOException e) {
359 e.printStackTrace();
360 log.error("Exception in sending message ({}) : {}", msg, e);
361 }
362 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700363
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800364 sw.flush();
365 queue.logSentData(current_time, size);
366 }
367 }
Naoki Shiota05334692014-03-18 16:06:36 -0700368
369 private boolean queuesHasMessageToSend() {
370 for (SwitchQueue queue : assignedQueues.values()) {
371 if (queue.hasMessageToSend()) {
372 return true;
373 }
374 }
375
376 return false;
377 }
378
379 private void notifyMessagePushed() {
380 queuingLock.lock();
381 try {
382 messagePushed.signal();
383 } finally {
384 queuingLock.unlock();
385 }
386 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700387 }
388
Naoki Shiotac1601d32013-11-20 10:47:34 -0800389 /**
390 * Initialize object with one thread.
391 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800392 public FlowPusher() {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700393 number_thread = DEFAULT_NUMBER_THREAD;
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800394 }
395
Naoki Shiotac1601d32013-11-20 10:47:34 -0800396 /**
397 * Initialize object with threads of given number.
398 * @param number_thread Number of threads to handle messages.
399 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800400 public FlowPusher(int number_thread) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700401 if (number_thread > 0) {
402 this.number_thread = number_thread;
403 } else {
404 this.number_thread = DEFAULT_NUMBER_THREAD;
405 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800406 }
407
Naoki Shiotac1601d32013-11-20 10:47:34 -0800408 /**
409 * Set parameters needed for sending messages.
410 * @param context FloodlightContext used for sending messages.
411 * If null, FlowPusher uses default context.
412 * @param modContext FloodlightModuleContext used for acquiring
413 * ThreadPoolService and registering MessageListener.
414 * @param factory Factory object to create OFMessage objects.
415 * @param damper Message damper used for sending messages.
416 * If null, FlowPusher creates its own damper object.
417 */
418 public void init(FloodlightContext context,
419 FloodlightModuleContext modContext,
420 BasicFactory factory,
421 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700422 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800423 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800424 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
Naoki Shiota05334692014-03-18 16:06:36 -0700425 IFloodlightProviderService flservice
426 = modContext.getServiceImpl(IFloodlightProviderService.class);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800427 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800428
429 if (damper != null) {
430 messageDamper = damper;
431 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800432 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800433 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
434 EnumSet.of(OFType.FLOW_MOD),
435 OFMESSAGE_DAMPER_TIMEOUT);
436 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700437 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800438
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800439 /**
440 * Begin processing queue.
441 */
442 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800443 if (factory == null) {
444 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800445 return;
446 }
447
Naoki Shiota81dbe302013-11-21 15:35:38 -0800448 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800449 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800450 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800451
Naoki Shiota81dbe302013-11-21 15:35:38 -0800452 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800453 thread.start();
454 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700455 }
456
Brian O'Connor8c166a72013-11-14 18:41:48 -0800457 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800458 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800459 SwitchQueue queue = getQueue(sw);
460
461 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700462 // create queue in case suspend is called before first message addition
463 queue = createQueueImpl(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800464 }
465
466 synchronized (queue) {
467 if (queue.state == QueueState.READY) {
468 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800469 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800470 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800471 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800472 }
473 }
474
Brian O'Connor8c166a72013-11-14 18:41:48 -0800475 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800476 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800477 SwitchQueue queue = getQueue(sw);
478
479 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700480 log.error("No queue is attached to DPID : {}", sw.getId());
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800481 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800482 }
483
484 synchronized (queue) {
485 if (queue.state == QueueState.SUSPENDED) {
486 queue.state = QueueState.READY;
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800487
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700488 // Free the latch if queue has any messages
489 FlowPusherThread thread = getProcessingThread(sw);
Naoki Shiota05334692014-03-18 16:06:36 -0700490 if (queue.hasMessageToSend()) {
491 thread.notifyMessagePushed();
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800492 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800493 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800494 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800495 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800496 }
497 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800498
Brian O'Connor8c166a72013-11-14 18:41:48 -0800499 @Override
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700500 public QueueState getState(IOFSwitch sw) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800501 SwitchQueue queue = getQueue(sw);
502
503 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700504 return QueueState.UNKNOWN;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800505 }
506
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700507 return queue.state;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800508 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800509
510 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800511 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800512 */
513 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800514 if (threadMap == null) {
515 return;
516 }
517
Naoki Shiota81dbe302013-11-21 15:35:38 -0800518 for (FlowPusherThread t : threadMap.values()) {
519 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800520 }
521 }
522
Naoki Shiotae3199732013-11-25 16:14:43 -0800523 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800524 public void setRate(IOFSwitch sw, long rate) {
525 SwitchQueue queue = getQueue(sw);
526 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700527 queue = createQueueImpl(sw);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800528 }
529
530 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800531 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700532 synchronized (queue) {
533 queue.max_rate = rate;
534 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700535 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700536 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800537
538 @Override
539 public boolean createQueue(IOFSwitch sw) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700540 SwitchQueue queue = createQueueImpl(sw);
541
542 return (queue != null);
543 }
544
545 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800546 SwitchQueue queue = getQueue(sw);
547 if (queue != null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700548 return queue;
Naoki Shiotae3199732013-11-25 16:14:43 -0800549 }
550
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700551 FlowPusherThread proc = getProcessingThread(sw);
Naoki Shiotae3199732013-11-25 16:14:43 -0800552 queue = new SwitchQueue();
553 queue.state = QueueState.READY;
Naoki Shiota05334692014-03-18 16:06:36 -0700554 proc.assignedQueues.put(sw, queue);
Naoki Shiotae3199732013-11-25 16:14:43 -0800555
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700556 return queue;
Naoki Shiotae3199732013-11-25 16:14:43 -0800557 }
558
559 @Override
560 public boolean deleteQueue(IOFSwitch sw) {
561 return deleteQueue(sw, false);
562 }
563
564 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800565 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700566 FlowPusherThread proc = getProcessingThread(sw);
Naoki Shiotae3199732013-11-25 16:14:43 -0800567
Naoki Shiotab485d412013-11-26 12:04:19 -0800568 if (forceStop) {
Naoki Shiota05334692014-03-18 16:06:36 -0700569 SwitchQueue queue = proc.assignedQueues.remove(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700570 if (queue == null) {
571 return false;
Naoki Shiotae3199732013-11-25 16:14:43 -0800572 }
573 return true;
574 } else {
575 SwitchQueue queue = getQueue(sw);
576 if (queue == null) {
577 return false;
578 }
579 synchronized (queue) {
580 queue.toBeDeleted = true;
581 }
582 return true;
583 }
584 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700585
Brian O'Connor8c166a72013-11-14 18:41:48 -0800586 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800587 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700588 return add(sw, msg, MsgPriority.NORMAL);
589 }
590
591 @Override
592 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
593 return addMessageImpl(sw, msg, priority);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800594 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700595
Brian O'Connor8c166a72013-11-14 18:41:48 -0800596 @Override
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800597 public void pushFlowEntries(
598 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700599 pushFlowEntries(entries, MsgPriority.NORMAL);
600 }
601
602 @Override
603 public void pushFlowEntries(
604 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800605
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800606 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700607 add(entry.first, entry.second, priority);
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800608 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800609 }
610
611 @Override
612 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700613 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
614 }
615
616 @Override
617 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800618 Collection<Pair<IOFSwitch, FlowEntry>> entries =
619 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
620
621 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700622 pushFlowEntries(entries, priority);
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800623 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700624
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800625 /**
626 * Create a message from FlowEntry and add it to the queue of the switch.
627 * @param sw Switch to which message is pushed.
628 * @param flowEntry FlowEntry object used for creating message.
629 * @return true if message is successfully added to a queue.
630 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700631 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800632 //
633 // Create the OpenFlow Flow Modification Entry to push
634 //
635 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
636 long cookie = flowEntry.flowEntryId().value();
637
638 short flowModCommand = OFFlowMod.OFPFC_ADD;
639 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
640 flowModCommand = OFFlowMod.OFPFC_ADD;
641 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
642 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
643 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
644 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
645 } else {
646 // Unknown user state. Ignore the entry
647 log.debug(
648 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800649 flowEntry.flowEntryId(),
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800650 flowEntry.flowEntryUserState());
651 return false;
652 }
653
654 //
655 // Fetch the match conditions.
656 //
657 // NOTE: The Flow matching conditions common for all Flow Entries are
658 // used ONLY if a Flow Entry does NOT have the corresponding matching
659 // condition set.
660 //
661 OFMatch match = new OFMatch();
662 match.setWildcards(OFMatch.OFPFW_ALL);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800663 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
664
665 // Match the Incoming Port
666 Port matchInPort = flowEntryMatch.inPort();
667 if (matchInPort != null) {
668 match.setInputPort(matchInPort.value());
669 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
670 }
671
672 // Match the Source MAC address
673 MACAddress matchSrcMac = flowEntryMatch.srcMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800674 if (matchSrcMac != null) {
675 match.setDataLayerSource(matchSrcMac.toString());
676 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
677 }
678
679 // Match the Destination MAC address
680 MACAddress matchDstMac = flowEntryMatch.dstMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800681 if (matchDstMac != null) {
682 match.setDataLayerDestination(matchDstMac.toString());
683 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
684 }
685
686 // Match the Ethernet Frame Type
687 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800688 if (matchEthernetFrameType != null) {
689 match.setDataLayerType(matchEthernetFrameType);
690 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
691 }
692
693 // Match the VLAN ID
694 Short matchVlanId = flowEntryMatch.vlanId();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800695 if (matchVlanId != null) {
696 match.setDataLayerVirtualLan(matchVlanId);
697 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
698 }
699
700 // Match the VLAN priority
701 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800702 if (matchVlanPriority != null) {
703 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
704 match.setWildcards(match.getWildcards()
705 & ~OFMatch.OFPFW_DL_VLAN_PCP);
706 }
707
708 // Match the Source IPv4 Network prefix
709 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800710 if (matchSrcIPv4Net != null) {
711 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
712 }
713
714 // Natch the Destination IPv4 Network prefix
715 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800716 if (matchDstIPv4Net != null) {
717 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
718 }
719
720 // Match the IP protocol
721 Byte matchIpProto = flowEntryMatch.ipProto();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800722 if (matchIpProto != null) {
723 match.setNetworkProtocol(matchIpProto);
724 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
725 }
726
727 // Match the IP ToS (DSCP field, 6 bits)
728 Byte matchIpToS = flowEntryMatch.ipToS();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800729 if (matchIpToS != null) {
730 match.setNetworkTypeOfService(matchIpToS);
731 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
732 }
733
734 // Match the Source TCP/UDP port
735 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800736 if (matchSrcTcpUdpPort != null) {
737 match.setTransportSource(matchSrcTcpUdpPort);
738 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
739 }
740
741 // Match the Destination TCP/UDP port
742 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800743 if (matchDstTcpUdpPort != null) {
744 match.setTransportDestination(matchDstTcpUdpPort);
745 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
746 }
747
748 //
749 // Fetch the actions
750 //
751 Short actionOutputPort = null;
752 List<OFAction> openFlowActions = new ArrayList<OFAction>();
753 int actionsLen = 0;
754 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
755 //
756 for (FlowEntryAction action : flowEntryActions.actions()) {
757 ActionOutput actionOutput = action.actionOutput();
758 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
759 ActionSetVlanPriority actionSetVlanPriority = action
760 .actionSetVlanPriority();
761 ActionStripVlan actionStripVlan = action.actionStripVlan();
762 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
763 .actionSetEthernetSrcAddr();
764 ActionSetEthernetAddr actionSetEthernetDstAddr = action
765 .actionSetEthernetDstAddr();
766 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
767 .actionSetIPv4SrcAddr();
768 ActionSetIPv4Addr actionSetIPv4DstAddr = action
769 .actionSetIPv4DstAddr();
770 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
771 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
772 .actionSetTcpUdpSrcPort();
773 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
774 .actionSetTcpUdpDstPort();
775 ActionEnqueue actionEnqueue = action.actionEnqueue();
776
777 if (actionOutput != null) {
778 actionOutputPort = actionOutput.port().value();
779 // XXX: The max length is hard-coded for now
780 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
781 .value(), (short) 0xffff);
782 openFlowActions.add(ofa);
783 actionsLen += ofa.getLength();
784 }
785
786 if (actionSetVlanId != null) {
787 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
788 actionSetVlanId.vlanId());
789 openFlowActions.add(ofa);
790 actionsLen += ofa.getLength();
791 }
792
793 if (actionSetVlanPriority != null) {
794 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
795 actionSetVlanPriority.vlanPriority());
796 openFlowActions.add(ofa);
797 actionsLen += ofa.getLength();
798 }
799
800 if (actionStripVlan != null) {
801 if (actionStripVlan.stripVlan() == true) {
802 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
803 openFlowActions.add(ofa);
804 actionsLen += ofa.getLength();
805 }
806 }
807
808 if (actionSetEthernetSrcAddr != null) {
809 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
810 actionSetEthernetSrcAddr.addr().toBytes());
811 openFlowActions.add(ofa);
812 actionsLen += ofa.getLength();
813 }
814
815 if (actionSetEthernetDstAddr != null) {
816 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
817 actionSetEthernetDstAddr.addr().toBytes());
818 openFlowActions.add(ofa);
819 actionsLen += ofa.getLength();
820 }
821
822 if (actionSetIPv4SrcAddr != null) {
823 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
824 actionSetIPv4SrcAddr.addr().value());
825 openFlowActions.add(ofa);
826 actionsLen += ofa.getLength();
827 }
828
829 if (actionSetIPv4DstAddr != null) {
830 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
831 actionSetIPv4DstAddr.addr().value());
832 openFlowActions.add(ofa);
833 actionsLen += ofa.getLength();
834 }
835
836 if (actionSetIpToS != null) {
837 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
838 actionSetIpToS.ipToS());
839 openFlowActions.add(ofa);
840 actionsLen += ofa.getLength();
841 }
842
843 if (actionSetTcpUdpSrcPort != null) {
844 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
845 actionSetTcpUdpSrcPort.port());
846 openFlowActions.add(ofa);
847 actionsLen += ofa.getLength();
848 }
849
850 if (actionSetTcpUdpDstPort != null) {
851 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
852 actionSetTcpUdpDstPort.port());
853 openFlowActions.add(ofa);
854 actionsLen += ofa.getLength();
855 }
856
857 if (actionEnqueue != null) {
858 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
859 .value(), actionEnqueue.queueId());
860 openFlowActions.add(ofa);
861 actionsLen += ofa.getLength();
862 }
863 }
864
Pavlin Radoslavov1fe06a22013-12-10 14:12:23 -0800865 fm.setIdleTimeout((short)flowEntry.idleTimeout())
866 .setHardTimeout((short)flowEntry.hardTimeout())
Pavlin Radoslavovafbf1032014-02-04 10:37:52 -0800867 .setPriority((short)flowEntry.priority())
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800868 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
869 .setCommand(flowModCommand).setMatch(match)
870 .setActions(openFlowActions)
871 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
872 fm.setOutPort(OFPort.OFPP_NONE.getValue());
873 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
874 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
875 if (actionOutputPort != null)
876 fm.setOutPort(actionOutputPort);
877 }
878
879 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800880 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
881 // permanent.
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800882 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800883 if ((flowEntry.idleTimeout() != 0) ||
884 (flowEntry.hardTimeout() != 0)) {
885 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
886 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800887
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700888 if (log.isTraceEnabled()) {
889 log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
890 , flowEntry.flowEntryUserState()
891 , sw.getStringId()
892 , flowEntry.flowEntryId()
893 , matchSrcMac
894 , matchDstMac
895 , matchInPort
896 , actionOutputPort
897 );
898 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800899
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700900 return addMessageImpl(sw, fm, priority);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700901 }
902
903 /**
904 * Add message to queue
905 * @param sw
906 * @param msg
907 * @param flowEntryId
908 * @return
909 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700910 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
Naoki Shiota05334692014-03-18 16:06:36 -0700911 FlowPusherThread thread = getProcessingThread(sw);
912
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700913 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700914
915 // create queue at first addition of message
916 if (queue == null) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700917 queue = createQueueImpl(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700918 }
919
920 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
Naoki Shiota05334692014-03-18 16:06:36 -0700921
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700922 synchronized (queue) {
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700923 queue.add(entry,priority);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700924 if (log.isTraceEnabled()) {
925 log.trace("Message is pushed : {}", entry.getOFMessage());
926 }
927 }
928
Naoki Shiota05334692014-03-18 16:06:36 -0700929 thread.notifyMessagePushed();
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700930
931 return true;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800932 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800933
934 @Override
935 public OFBarrierReply barrier(IOFSwitch sw) {
936 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
937 if (future == null) {
938 return null;
939 }
940
941 try {
942 return future.get();
943 } catch (InterruptedException e) {
944 e.printStackTrace();
945 log.error("InterruptedException: {}", e);
946 return null;
947 } catch (ExecutionException e) {
948 e.printStackTrace();
949 log.error("ExecutionException: {}", e);
950 return null;
951 }
952 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800953
Naoki Shiotac1601d32013-11-20 10:47:34 -0800954 @Override
955 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
956 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -0800957
958 if (sw == null) {
959 return null;
960 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800961
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700962 OFBarrierRequest msg = createBarrierRequest(sw);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800963
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700964 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
965 barrierFutures.put(BarrierInfo.create(sw,msg), future);
966
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700967 addMessageImpl(sw, msg, MsgPriority.NORMAL);
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800968
Naoki Shiotac1601d32013-11-20 10:47:34 -0800969 return future;
970 }
971
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700972 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
973 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
974 msg.setXid(sw.getNextTransactionId());
975
976 return msg;
977 }
978
Naoki Shiotae3199732013-11-25 16:14:43 -0800979 /**
980 * Get a queue attached to a switch.
981 * @param sw Switch object
982 * @return Queue object
983 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800984 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800985 if (sw == null) {
986 return null;
987 }
988
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700989 FlowPusherThread th = getProcessingThread(sw);
990 if (th == null) {
991 return null;
992 }
993
Naoki Shiota05334692014-03-18 16:06:36 -0700994 return th.assignedQueues.get(sw);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800995 }
996
Naoki Shiotae3199732013-11-25 16:14:43 -0800997 /**
998 * Get a hash value correspondent to a switch.
999 * @param sw Switch object
1000 * @return Hash value
1001 */
Naoki Shiotac1601d32013-11-20 10:47:34 -08001002 protected long getHash(IOFSwitch sw) {
1003 // This code assumes DPID is sequentially assigned.
1004 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001005 return sw.getId() % number_thread;
1006 }
Naoki Shiotae3199732013-11-25 16:14:43 -08001007
1008 /**
1009 * Get a Thread object which processes the queue attached to a switch.
1010 * @param sw Switch object
1011 * @return Thread object
1012 */
Naoki Shiota8df97bc2014-03-13 18:42:23 -07001013 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -08001014 long hash = getHash(sw);
1015
1016 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -08001017 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001018
Naoki Shiotac1601d32013-11-20 10:47:34 -08001019 @Override
1020 public String getName() {
1021 return "flowpusher";
1022 }
1023
1024 @Override
1025 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1026 return false;
1027 }
1028
1029 @Override
1030 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1031 return false;
1032 }
1033
1034 @Override
1035 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001036 if (log.isTraceEnabled()) {
1037 log.trace("Received BARRIER_REPLY from : {}", sw.getId());
1038 }
1039
1040 if (msg.getType() != OFType.BARRIER_REPLY) {
1041 log.error("Unexpected reply message : {}", msg.getType());
Naoki Shiotac1601d32013-11-20 10:47:34 -08001042 return Command.CONTINUE;
1043 }
1044
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001045 OFBarrierReply reply = (OFBarrierReply) msg;
1046 BarrierInfo info = BarrierInfo.create(sw,reply);
Naoki Shiotac1601d32013-11-20 10:47:34 -08001047
Naoki Shiotad6ef3b32014-03-13 18:42:23 -07001048 // Deliver future if exists
1049 OFBarrierReplyFuture future = barrierFutures.get(info);
1050 if (future != null) {
1051 future.deliverFuture(sw, msg);
1052 barrierFutures.remove(info);
1053 }
Naoki Shiotac1601d32013-11-20 10:47:34 -08001054
1055 return Command.CONTINUE;
1056 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001057}