blob: a02f0d5154226a4329752f5442c18746220c9400 [file] [log] [blame]
Jonathan Hart23701d12014-04-03 10:45:48 -07001package net.onrc.onos.core.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota47993102014-07-09 14:00:54 -07008import java.util.Iterator;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08009import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080010import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070011import java.util.Map;
Naoki Shiota47993102014-07-09 14:00:54 -070012import java.util.Map.Entry;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070014import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080015import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070016import java.util.concurrent.locks.Condition;
17import java.util.concurrent.locks.Lock;
18import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070027import net.onrc.onos.core.intent.FlowEntry;
Jonathan Hart23701d12014-04-03 10:45:48 -070028import net.onrc.onos.core.util.Pair;
Jonathan Harta99ec672014-04-03 11:30:34 -070029
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070030import org.projectfloodlight.openflow.protocol.OFBarrierReply;
31import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
32import org.projectfloodlight.openflow.protocol.OFFactory;
33import org.projectfloodlight.openflow.protocol.OFFlowMod;
34import org.projectfloodlight.openflow.protocol.OFMessage;
35import org.projectfloodlight.openflow.protocol.OFType;
Jonathan Harta99ec672014-04-03 11:30:34 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070038
Yuta HIGUCHIafadeda2014-07-24 17:11:07 -070039import com.google.common.cache.CacheBuilder;
40
Jonathan Hartc00f5c22014-06-10 15:14:40 -070041import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
42
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070043/**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070044 * FlowPusher is a implementation of FlowPusherService. FlowPusher assigns one
45 * message queue instance for each one switch. Number of message processing
46 * threads is configurable by constructor, and one thread can handle multiple
47 * message queues. Each queue will be assigned to a thread according to hash
48 * function defined by getHash(). Each processing thread reads messages from
49 * queues and sends it to switches in round-robin. Processing thread also
50 * calculates rate of sending to suppress excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070051 */
Ray Milkey1584ec82014-04-10 11:58:30 -070052public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
Ray Milkeyec838942014-04-09 11:28:43 -070053 private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070054 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080055
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080056 // Number of messages sent to switch at once
57 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080058
Ray Milkey8e5170e2014-04-02 12:09:55 -070059 private static class SwitchQueueEntry {
60 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070061
Ray Milkey8e5170e2014-04-02 12:09:55 -070062 public SwitchQueueEntry(OFMessage msg) {
63 this.msg = msg;
64 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070065
Ray Milkey8e5170e2014-04-02 12:09:55 -070066 public OFMessage getOFMessage() {
67 return msg;
68 }
69 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -070070
Ray Milkey8e5170e2014-04-02 12:09:55 -070071 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070072 * SwitchQueue represents message queue attached to a switch. This consists
73 * of queue itself and variables used for limiting sending rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -070074 */
Pavlin Radoslavovfee80982014-04-10 12:12:04 -070075 private static class SwitchQueue {
Ray Milkey2476cac2014-04-08 11:03:21 -070076 List<Queue<SwitchQueueEntry>> rawQueues;
Ray Milkey8e5170e2014-04-02 12:09:55 -070077 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080078
Ray Milkey8e5170e2014-04-02 12:09:55 -070079 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070080 long maxRate = 0; // 0 indicates no limitation
Ray Milkey2476cac2014-04-08 11:03:21 -070081 long lastSentTime = 0;
82 long lastSentSize = 0;
Naoki Shiota05334692014-03-18 16:06:36 -070083
Ray Milkey8e5170e2014-04-02 12:09:55 -070084 // "To be deleted" flag
85 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080086
Ray Milkey8e5170e2014-04-02 12:09:55 -070087 SwitchQueue() {
Ray Milkey2476cac2014-04-08 11:03:21 -070088 rawQueues = new ArrayList<Queue<SwitchQueueEntry>>(
Ray Milkey8e5170e2014-04-02 12:09:55 -070089 MsgPriority.values().length);
90 for (int i = 0; i < MsgPriority.values().length; ++i) {
Ray Milkey2476cac2014-04-08 11:03:21 -070091 rawQueues.add(i, new ArrayDeque<SwitchQueueEntry>());
Ray Milkey8e5170e2014-04-02 12:09:55 -070092 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -080093
Ray Milkey8e5170e2014-04-02 12:09:55 -070094 state = QueueState.READY;
95 }
Naoki Shiotae3199732013-11-25 16:14:43 -080096
Ray Milkey8e5170e2014-04-02 12:09:55 -070097 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -070098 * Check if sending rate is within the rate.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070099 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700100 * @param current Current time
101 * @return true if within the rate
102 */
103 boolean isSendable(long current) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700104 if (maxRate == 0) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700105 // no limitation
106 return true;
107 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800108
Ray Milkey2476cac2014-04-08 11:03:21 -0700109 if (current == lastSentTime) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700110 return false;
111 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800112
Ray Milkey8e5170e2014-04-02 12:09:55 -0700113 // Check if sufficient time (from aspect of rate) elapsed or not.
Ray Milkey2476cac2014-04-08 11:03:21 -0700114 long rate = lastSentSize / (current - lastSentTime);
115 return (rate < maxRate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700116 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800117
Ray Milkey8e5170e2014-04-02 12:09:55 -0700118 /**
119 * Log time and size of last sent data.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700120 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700121 * @param current Time to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700122 * @param size Size of sent data (in bytes).
Ray Milkey8e5170e2014-04-02 12:09:55 -0700123 */
124 void logSentData(long current, long size) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700125 lastSentTime = current;
126 lastSentSize = size;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700127 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700128
Ray Milkey8e5170e2014-04-02 12:09:55 -0700129 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
130 Queue<SwitchQueueEntry> queue = getQueue(priority);
131 if (queue == null) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700132 log.error("Unexpected priority: {}", priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700133 return false;
134 }
135 return queue.add(entry);
136 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800137
Ray Milkey8e5170e2014-04-02 12:09:55 -0700138 /**
139 * Poll single appropriate entry object according to QueueState.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700140 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700141 * @return Entry object.
142 */
143 SwitchQueueEntry poll() {
144 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700145 case READY: {
146 for (int i = 0; i < rawQueues.size(); ++i) {
147 SwitchQueueEntry entry = rawQueues.get(i).poll();
148 if (entry != null) {
149 return entry;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700150 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700151 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800152
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700153 return null;
154 }
155 case SUSPENDED: {
156 // Only polling from high priority queue
157 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
158 return entry;
159 }
160 default:
161 log.error("Unexpected QueueState: {}", state);
162 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700163 }
164 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800165
Ray Milkey8e5170e2014-04-02 12:09:55 -0700166 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700167 * Check if this object has any messages in the queues to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700168 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700169 * @return True if there are some messages to be sent.
170 */
171 boolean hasMessageToSend() {
172 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700173 case READY:
174 for (Queue<SwitchQueueEntry> queue : rawQueues) {
175 if (!queue.isEmpty()) {
176 return true;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700177 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700178 }
179 break;
180 case SUSPENDED:
181 // Only checking high priority queue
182 return (!getQueue(MsgPriority.HIGH).isEmpty());
183 default:
184 log.error("Unexpected QueueState: {}", state);
185 return false;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700186 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800187
Ray Milkey8e5170e2014-04-02 12:09:55 -0700188 return false;
189 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800190
Ray Milkey8e5170e2014-04-02 12:09:55 -0700191 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700192 return rawQueues.get(priority.ordinal());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700193 }
194 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800195
Ray Milkey8e5170e2014-04-02 12:09:55 -0700196 /**
197 * BarrierInfo holds information to specify barrier message sent to switch.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700198 */
Ray Milkey1584ec82014-04-10 11:58:30 -0700199 private static final class BarrierInfo {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700200 final long dpid;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700201 final long xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800202
Ray Milkey8e5170e2014-04-02 12:09:55 -0700203 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
204 return new BarrierInfo(sw.getId(), req.getXid());
205 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800206
Ray Milkey8e5170e2014-04-02 12:09:55 -0700207 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
208 return new BarrierInfo(sw.getId(), rpy.getXid());
209 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800210
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700211 private BarrierInfo(long dpid, long xid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700212 this.dpid = dpid;
213 this.xid = xid;
214 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800215
Ray Milkey8e5170e2014-04-02 12:09:55 -0700216 // Auto generated code by Eclipse
217 @Override
218 public int hashCode() {
219 final int prime = 31;
220 int result = 1;
221 result = prime * result + (int) (dpid ^ (dpid >>> 32));
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700222 result = prime * result + (int) (xid ^ (xid >>> 32));
Ray Milkey8e5170e2014-04-02 12:09:55 -0700223 return result;
224 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800225
Ray Milkey8e5170e2014-04-02 12:09:55 -0700226 @Override
227 public boolean equals(Object obj) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700228 if (this == obj) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700229 return true;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700230 }
231 if (obj == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700232 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700233 }
234 if (getClass() != obj.getClass()) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700235 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700236 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800237
Ray Milkey8e5170e2014-04-02 12:09:55 -0700238 BarrierInfo other = (BarrierInfo) obj;
239 return (this.dpid == other.dpid) && (this.xid == other.xid);
240 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800241
Ray Milkey8e5170e2014-04-02 12:09:55 -0700242 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800243
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700244 private FloodlightModuleContext context = null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700245 private IThreadPoolService threadPool = null;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700246 private IFloodlightProviderService floodlightProvider = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800247
Ray Milkey8e5170e2014-04-02 12:09:55 -0700248 // Map of threads versus dpid
249 private Map<Long, FlowPusherThread> threadMap = null;
250 // Map from (DPID and transaction ID) to Future objects.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700251 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
252 new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800253
Ray Milkey2476cac2014-04-08 11:03:21 -0700254 private int numberThread;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800255
Ray Milkey8e5170e2014-04-02 12:09:55 -0700256 /**
257 * Main thread that reads messages from queues and sends them to switches.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700258 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700259 private static class FlowPusherThread extends Thread {
260 // Weak ConcurrentHashMap
261 private Map<IOFSwitch, SwitchQueue> assignedQueues = CacheBuilder.newBuilder()
262 .weakKeys()
263 .<IOFSwitch, SwitchQueue>build().asMap();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800264
Ray Milkey8e5170e2014-04-02 12:09:55 -0700265 final Lock queuingLock = new ReentrantLock();
266 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800267
Ray Milkey8e5170e2014-04-02 12:09:55 -0700268 @Override
269 public void run() {
270 this.setName("FlowPusherThread " + this.getId());
271 while (true) {
272 while (!queuesHasMessageToSend()) {
273 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800274
Ray Milkey8e5170e2014-04-02 12:09:55 -0700275 try {
276 // wait for message pushed to queue
277 messagePushed.await();
278 } catch (InterruptedException e) {
279 // Interrupted to be shut down (not an error)
280 log.debug("FlowPusherThread is interrupted");
281 return;
282 } finally {
283 queuingLock.unlock();
284 }
285 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800286
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700287 for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues
288 .entrySet().iterator(); it.hasNext();) {
Naoki Shiota47993102014-07-09 14:00:54 -0700289 Entry<IOFSwitch, SwitchQueue> entry = it.next();
290 IOFSwitch sw = entry.getKey();
291 SwitchQueue queue = entry.getValue();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800292
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700293 if (queue == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700294 continue;
295 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800296
Ray Milkey8e5170e2014-04-02 12:09:55 -0700297 synchronized (queue) {
298 processQueue(sw, queue, MAX_MESSAGE_SEND);
299 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
300 // remove queue if flagged to be.
Naoki Shiota47993102014-07-09 14:00:54 -0700301 it.remove();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700302 }
303 }
304 }
305 }
306 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800307
Ray Milkey8e5170e2014-04-02 12:09:55 -0700308 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700309 * Read messages from queue and send them to the switch. If number of
310 * messages excess the limit, stop sending messages.
311 * <p>
312 * @param sw Switch to which messages will be sent.
313 * @param queue Queue of messages.
314 * @param maxMsg Limitation of number of messages to be sent. If set to
315 * 0, all messages in queue will be sent.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700316 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700317 private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700318 // check sending rate and determine it to be sent or not
Ray Milkey2476cac2014-04-08 11:03:21 -0700319 long currentTime = System.currentTimeMillis();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700320 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800321
Naoki Shiota47993102014-07-09 14:00:54 -0700322 if (sw.isConnected() && queue.isSendable(currentTime)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700323 int i = 0;
324 while (queue.hasMessageToSend()) {
325 // Number of messages excess the limit
Ray Milkey9526d6f2014-04-10 14:54:15 -0700326 if (0 < maxMsg && maxMsg <= i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700327 break;
328 }
329 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800330
Ray Milkey8e5170e2014-04-02 12:09:55 -0700331 SwitchQueueEntry queueEntry;
332 synchronized (queue) {
333 queueEntry = queue.poll();
334 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800335
Ray Milkey8e5170e2014-04-02 12:09:55 -0700336 OFMessage msg = queueEntry.getOFMessage();
337 try {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700338 sw.write(msg, null);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700339 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700340 log.trace("Pusher sends message to switch {}: {}", sw.getStringId(), msg);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700341 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700342 // TODO BOC how do we get the size?
343 // size += msg.getLength();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700344 } catch (IOException e) {
Yuta HIGUCHIe933a282014-07-13 21:02:29 -0700345 log.error("Exception in sending message (" + msg + "):", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700346 }
347 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800348
Ray Milkey8e5170e2014-04-02 12:09:55 -0700349 sw.flush();
Ray Milkey2476cac2014-04-08 11:03:21 -0700350 queue.logSentData(currentTime, size);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700351 }
352 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800353
Ray Milkey8e5170e2014-04-02 12:09:55 -0700354 private boolean queuesHasMessageToSend() {
355 for (SwitchQueue queue : assignedQueues.values()) {
356 if (queue.hasMessageToSend()) {
357 return true;
358 }
359 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700360
Ray Milkey8e5170e2014-04-02 12:09:55 -0700361 return false;
362 }
Naoki Shiota05334692014-03-18 16:06:36 -0700363
Ray Milkey8e5170e2014-04-02 12:09:55 -0700364 private void notifyMessagePushed() {
365 queuingLock.lock();
366 try {
367 messagePushed.signal();
368 } finally {
369 queuingLock.unlock();
370 }
371 }
372 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700373
Ray Milkey8e5170e2014-04-02 12:09:55 -0700374 /**
375 * Initialize object with one thread.
376 */
377 public FlowPusher() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700378 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700379 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800380
Ray Milkey8e5170e2014-04-02 12:09:55 -0700381 /**
382 * Initialize object with threads of given number.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700383 * <p>
Ray Milkey9526d6f2014-04-10 14:54:15 -0700384 * @param numberThreadValue Number of threads to handle messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700385 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700386 public FlowPusher(int numberThreadValue) {
387 if (numberThreadValue > 0) {
388 numberThread = numberThreadValue;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700389 } else {
Ray Milkey9526d6f2014-04-10 14:54:15 -0700390 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700391 }
392 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800393
Ray Milkey8e5170e2014-04-02 12:09:55 -0700394 /**
395 * Set parameters needed for sending messages.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700396 * <p>
397 * @param floodlightContext FloodlightModuleContext used for acquiring
398 * ThreadPoolService and registering MessageListener.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700399 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700400 public void init(FloodlightModuleContext floodlightContext) {
401 this.context = floodlightContext;
402 this.floodlightProvider = context
403 .getServiceImpl(IFloodlightProviderService.class);
404 this.threadPool = context.getServiceImpl(IThreadPoolService.class);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800405
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700406 floodlightProvider.addOFMessageListener(OFType.BARRIER_REPLY, this);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700407 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800408
Ray Milkey8e5170e2014-04-02 12:09:55 -0700409 /**
410 * Begin processing queue.
411 */
412 public void start() {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700413 threadMap = new HashMap<Long, FlowPusherThread>();
Ray Milkey2476cac2014-04-08 11:03:21 -0700414 for (long i = 0; i < numberThread; ++i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700415 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800416
Ray Milkey8e5170e2014-04-02 12:09:55 -0700417 threadMap.put(i, thread);
418 thread.start();
419 }
420 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800421
Ray Milkey8e5170e2014-04-02 12:09:55 -0700422 @Override
423 public boolean suspend(IOFSwitch sw) {
424 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700425
Ray Milkey8e5170e2014-04-02 12:09:55 -0700426 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700427 // create queue in case suspend is called before first message
428 // addition
Ray Milkey8e5170e2014-04-02 12:09:55 -0700429 queue = createQueueImpl(sw);
430 }
431
432 synchronized (queue) {
433 if (queue.state == QueueState.READY) {
434 queue.state = QueueState.SUSPENDED;
435 return true;
436 }
437 return false;
438 }
439 }
440
441 @Override
442 public boolean resume(IOFSwitch sw) {
443 SwitchQueue queue = getQueue(sw);
444
445 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700446 log.error("No queue is attached to DPID: {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700447 return false;
448 }
449
450 synchronized (queue) {
451 if (queue.state == QueueState.SUSPENDED) {
452 queue.state = QueueState.READY;
453
454 // Free the latch if queue has any messages
455 FlowPusherThread thread = getProcessingThread(sw);
456 if (queue.hasMessageToSend()) {
457 thread.notifyMessagePushed();
458 }
459 return true;
460 }
461 return false;
462 }
463 }
464
465 @Override
466 public QueueState getState(IOFSwitch sw) {
467 SwitchQueue queue = getQueue(sw);
468
469 if (queue == null) {
470 return QueueState.UNKNOWN;
471 }
472
473 return queue.state;
474 }
475
476 /**
477 * Stop processing queue and exit thread.
478 */
479 public void stop() {
480 if (threadMap == null) {
481 return;
482 }
483
484 for (FlowPusherThread t : threadMap.values()) {
485 t.interrupt();
486 }
487 }
488
489 @Override
490 public void setRate(IOFSwitch sw, long rate) {
491 SwitchQueue queue = getQueue(sw);
492 if (queue == null) {
493 queue = createQueueImpl(sw);
494 }
495
496 if (rate > 0) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700497 log.debug("rate for {} is set to {}", sw.getStringId(), rate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700498 synchronized (queue) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700499 queue.maxRate = rate;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700500 }
501 }
502 }
503
504 @Override
Pavlin Radoslavov9fc535a2014-04-11 13:00:12 -0700505 @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700506 justification = "Future versions of createQueueImpl() might return null")
Ray Milkey8e5170e2014-04-02 12:09:55 -0700507 public boolean createQueue(IOFSwitch sw) {
508 SwitchQueue queue = createQueueImpl(sw);
509
510 return (queue != null);
511 }
512
513 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
514 SwitchQueue queue = getQueue(sw);
515 if (queue != null) {
516 return queue;
517 }
518
519 FlowPusherThread proc = getProcessingThread(sw);
520 queue = new SwitchQueue();
521 queue.state = QueueState.READY;
522 proc.assignedQueues.put(sw, queue);
523
524 return queue;
525 }
526
527 @Override
528 public boolean deleteQueue(IOFSwitch sw) {
529 return deleteQueue(sw, false);
530 }
531
532 @Override
533 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
534 FlowPusherThread proc = getProcessingThread(sw);
535
536 if (forceStop) {
537 SwitchQueue queue = proc.assignedQueues.remove(sw);
538 if (queue == null) {
539 return false;
540 }
541 return true;
542 } else {
543 SwitchQueue queue = getQueue(sw);
544 if (queue == null) {
545 return false;
546 }
547 synchronized (queue) {
548 queue.toBeDeleted = true;
549 }
550 return true;
551 }
552 }
553
554 @Override
555 public boolean add(IOFSwitch sw, OFMessage msg) {
556 return add(sw, msg, MsgPriority.NORMAL);
557 }
558
559 @Override
560 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
561 return addMessageImpl(sw, msg, priority);
562 }
563
564 @Override
565 public void pushFlowEntries(
566 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
567 pushFlowEntries(entries, MsgPriority.NORMAL);
568 }
569
570 @Override
571 public void pushFlowEntries(
572 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
573
574 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Sho SHIMIZU26d77892014-06-10 11:07:06 -0700575 add(entry.getFirst(), entry.getSecond(), priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700576 }
577 }
578
579 @Override
580 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
581 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
582 }
583
584 @Override
585 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
586 Collection<Pair<IOFSwitch, FlowEntry>> entries =
587 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
588
589 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
590 pushFlowEntries(entries, priority);
591 }
592
593 /**
Ray Milkeyddcd4922014-04-17 11:21:20 -0700594 * Create a message from FlowEntry and add it to the queue of the switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700595 * <p>
596 * @param sw Switch to which message is pushed.
Ray Milkeyddcd4922014-04-17 11:21:20 -0700597 * @param flowEntry FlowEntry object used for creating message.
598 * @return true if message is successfully added to a queue.
599 */
600 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
601 //
602 // Create the OpenFlow Flow Modification Entry to push
603 //
Jonathan Harta213bce2014-08-11 15:44:07 -0700604 OFFlowMod fm = flowEntry.buildFlowMod(sw.getFactory());
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700605 // log.trace("Pushing flow mod {}", fm);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700606 return addMessageImpl(sw, fm, priority);
607 }
608
609 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700610 * Add message to queue.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700611 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700612 * @param sw
613 * @param msg
Sho SHIMIZUa1199fa2014-06-10 18:11:12 -0700614 * @param priority
Jonathan Hart99ff20a2014-06-15 16:53:00 -0700615 * @return true if the message was added successfully, otherwise false
Ray Milkey8e5170e2014-04-02 12:09:55 -0700616 */
617 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
618 FlowPusherThread thread = getProcessingThread(sw);
619
620 SwitchQueue queue = getQueue(sw);
621
622 // create queue at first addition of message
623 if (queue == null) {
624 queue = createQueueImpl(sw);
625 }
626
627 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
628
629 synchronized (queue) {
630 queue.add(entry, priority);
631 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700632 log.trace("Message is pushed to switch {}: {}", sw.getStringId(), entry.getOFMessage());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700633 }
634 }
635
636 thread.notifyMessagePushed();
637
638 return true;
639 }
640
641 @Override
642 public OFBarrierReply barrier(IOFSwitch sw) {
643 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
644 if (future == null) {
645 return null;
646 }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700647 try {
648 return future.get();
649 } catch (InterruptedException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700650 log.error("InterruptedException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700651 } catch (ExecutionException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700652 log.error("ExecutionException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700653 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700654 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700655 }
656
657 @Override
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700658 public OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700659 // TODO creation of message and future should be moved to OFSwitchImpl
660
661 if (sw == null) {
662 return null;
663 }
664
665 OFBarrierRequest msg = createBarrierRequest(sw);
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700666 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
667 (int) msg.getXid());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700668 barrierFutures.put(BarrierInfo.create(sw, msg), future);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700669 addMessageImpl(sw, msg, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700670 return future;
671 }
672
673 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
Jonathan Harta213bce2014-08-11 15:44:07 -0700674 OFFactory factory = sw.getFactory();
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700675 if (factory == null) {
676 log.error("No OF Message Factory for switch {} with OFVersion {}", sw,
677 sw.getOFVersion());
678 return null;
679 }
680 return factory.buildBarrierRequest()
681 .setXid(sw.getNextTransactionId())
682 .build();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700683 }
684
685 /**
686 * Get a queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700687 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700688 * @param sw Switch object
689 * @return Queue object
690 */
691 protected SwitchQueue getQueue(IOFSwitch sw) {
692 if (sw == null) {
693 return null;
694 }
695
696 FlowPusherThread th = getProcessingThread(sw);
697 if (th == null) {
698 return null;
699 }
700
701 return th.assignedQueues.get(sw);
702 }
703
704 /**
705 * Get a hash value correspondent to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700706 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700707 * @param sw Switch object
708 * @return Hash value
709 */
710 protected long getHash(IOFSwitch sw) {
711 // This code assumes DPID is sequentially assigned.
712 // TODO consider equalization algorithm
Ray Milkey2476cac2014-04-08 11:03:21 -0700713 return sw.getId() % numberThread;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700714 }
715
716 /**
717 * Get a Thread object which processes the queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700718 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700719 * @param sw Switch object
720 * @return Thread object
721 */
722 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
723 long hash = getHash(sw);
724
725 return threadMap.get(hash);
726 }
727
728 @Override
729 public String getName() {
730 return "flowpusher";
731 }
732
733 @Override
734 public boolean isCallbackOrderingPrereq(OFType type, String name) {
735 return false;
736 }
737
738 @Override
739 public boolean isCallbackOrderingPostreq(OFType type, String name) {
740 return false;
741 }
742
743 @Override
744 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
745 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700746 log.trace("Received BARRIER_REPLY from : {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700747 }
748
Pavlin Radoslavov0b88a262014-04-10 15:43:27 -0700749 if ((msg.getType() != OFType.BARRIER_REPLY) ||
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700750 !(msg instanceof OFBarrierReply)) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700751 log.error("Unexpected reply message: {}", msg.getType());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700752 return Command.CONTINUE;
753 }
754
755 OFBarrierReply reply = (OFBarrierReply) msg;
756 BarrierInfo info = BarrierInfo.create(sw, reply);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700757 // Deliver future if exists
758 OFBarrierReplyFuture future = barrierFutures.get(info);
759 if (future != null) {
760 future.deliverFuture(sw, msg);
761 barrierFutures.remove(info);
762 }
763
764 return Command.CONTINUE;
765 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700766
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700767}