blob: c40eac04ce95385deccc5921e55d57981ecb2c49 [file] [log] [blame]
Jonathan Hart23701d12014-04-03 10:45:48 -07001package net.onrc.onos.core.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota47993102014-07-09 14:00:54 -07008import java.util.Iterator;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08009import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080010import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070011import java.util.Map;
Naoki Shiota47993102014-07-09 14:00:54 -070012import java.util.Map.Entry;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070014import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080015import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070016import java.util.concurrent.locks.Condition;
17import java.util.concurrent.locks.Lock;
18import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070027import net.onrc.onos.core.intent.FlowEntry;
Jonathan Harta99ec672014-04-03 11:30:34 -070028
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -070029import org.apache.commons.lang3.tuple.Pair;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070030import org.projectfloodlight.openflow.protocol.OFBarrierReply;
31import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
32import org.projectfloodlight.openflow.protocol.OFFactory;
33import org.projectfloodlight.openflow.protocol.OFFlowMod;
34import org.projectfloodlight.openflow.protocol.OFMessage;
35import org.projectfloodlight.openflow.protocol.OFType;
Jonathan Harta99ec672014-04-03 11:30:34 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070038
Yuta HIGUCHIafadeda2014-07-24 17:11:07 -070039import com.google.common.cache.CacheBuilder;
40
Jonathan Hartc00f5c22014-06-10 15:14:40 -070041import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
42
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070043/**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070044 * FlowPusher is a implementation of FlowPusherService. FlowPusher assigns one
45 * message queue instance for each one switch. Number of message processing
46 * threads is configurable by constructor, and one thread can handle multiple
47 * message queues. Each queue will be assigned to a thread according to hash
48 * function defined by getHash(). Each processing thread reads messages from
49 * queues and sends it to switches in round-robin. Processing thread also
50 * calculates rate of sending to suppress excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070051 */
Ray Milkey1584ec82014-04-10 11:58:30 -070052public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
Ray Milkeyec838942014-04-09 11:28:43 -070053 private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070054 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080055
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080056 // Number of messages sent to switch at once
57 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080058
Ray Milkey8e5170e2014-04-02 12:09:55 -070059 private static class SwitchQueueEntry {
60 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070061
Ray Milkey8e5170e2014-04-02 12:09:55 -070062 public SwitchQueueEntry(OFMessage msg) {
63 this.msg = msg;
64 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070065
Ray Milkey8e5170e2014-04-02 12:09:55 -070066 public OFMessage getOFMessage() {
67 return msg;
68 }
69 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -070070
Ray Milkey8e5170e2014-04-02 12:09:55 -070071 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070072 * SwitchQueue represents message queue attached to a switch. This consists
73 * of queue itself and variables used for limiting sending rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -070074 */
Pavlin Radoslavovfee80982014-04-10 12:12:04 -070075 private static class SwitchQueue {
Ray Milkey2476cac2014-04-08 11:03:21 -070076 List<Queue<SwitchQueueEntry>> rawQueues;
Ray Milkey8e5170e2014-04-02 12:09:55 -070077 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080078
Ray Milkey8e5170e2014-04-02 12:09:55 -070079 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070080 long maxRate = 0; // 0 indicates no limitation
Ray Milkey2476cac2014-04-08 11:03:21 -070081 long lastSentTime = 0;
82 long lastSentSize = 0;
Naoki Shiota05334692014-03-18 16:06:36 -070083
Ray Milkey8e5170e2014-04-02 12:09:55 -070084 // "To be deleted" flag
85 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080086
Ray Milkey8e5170e2014-04-02 12:09:55 -070087 SwitchQueue() {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -070088 rawQueues = new ArrayList<>(MsgPriority.values().length);
Ray Milkey8e5170e2014-04-02 12:09:55 -070089 for (int i = 0; i < MsgPriority.values().length; ++i) {
Ray Milkey2476cac2014-04-08 11:03:21 -070090 rawQueues.add(i, new ArrayDeque<SwitchQueueEntry>());
Ray Milkey8e5170e2014-04-02 12:09:55 -070091 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -080092
Ray Milkey8e5170e2014-04-02 12:09:55 -070093 state = QueueState.READY;
94 }
Naoki Shiotae3199732013-11-25 16:14:43 -080095
Ray Milkey8e5170e2014-04-02 12:09:55 -070096 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -070097 * Check if sending rate is within the rate.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070098 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -070099 * @param current Current time
100 * @return true if within the rate
101 */
102 boolean isSendable(long current) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700103 if (maxRate == 0) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700104 // no limitation
105 return true;
106 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800107
Ray Milkey2476cac2014-04-08 11:03:21 -0700108 if (current == lastSentTime) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700109 return false;
110 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800111
Ray Milkey8e5170e2014-04-02 12:09:55 -0700112 // Check if sufficient time (from aspect of rate) elapsed or not.
Ray Milkey2476cac2014-04-08 11:03:21 -0700113 long rate = lastSentSize / (current - lastSentTime);
114 return (rate < maxRate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700115 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800116
Ray Milkey8e5170e2014-04-02 12:09:55 -0700117 /**
118 * Log time and size of last sent data.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700119 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700120 * @param current Time to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700121 * @param size Size of sent data (in bytes).
Ray Milkey8e5170e2014-04-02 12:09:55 -0700122 */
123 void logSentData(long current, long size) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700124 lastSentTime = current;
125 lastSentSize = size;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700126 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700127
Ray Milkey8e5170e2014-04-02 12:09:55 -0700128 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
129 Queue<SwitchQueueEntry> queue = getQueue(priority);
130 if (queue == null) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700131 log.error("Unexpected priority: {}", priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700132 return false;
133 }
134 return queue.add(entry);
135 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800136
Ray Milkey8e5170e2014-04-02 12:09:55 -0700137 /**
138 * Poll single appropriate entry object according to QueueState.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700139 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700140 * @return Entry object.
141 */
142 SwitchQueueEntry poll() {
143 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700144 case READY: {
145 for (int i = 0; i < rawQueues.size(); ++i) {
146 SwitchQueueEntry entry = rawQueues.get(i).poll();
147 if (entry != null) {
148 return entry;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700149 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700150 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800151
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700152 return null;
153 }
154 case SUSPENDED: {
155 // Only polling from high priority queue
156 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
157 return entry;
158 }
159 default:
160 log.error("Unexpected QueueState: {}", state);
161 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700162 }
163 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800164
Ray Milkey8e5170e2014-04-02 12:09:55 -0700165 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700166 * Check if this object has any messages in the queues to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700167 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700168 * @return True if there are some messages to be sent.
169 */
170 boolean hasMessageToSend() {
171 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700172 case READY:
173 for (Queue<SwitchQueueEntry> queue : rawQueues) {
174 if (!queue.isEmpty()) {
175 return true;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700176 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700177 }
178 break;
179 case SUSPENDED:
180 // Only checking high priority queue
181 return (!getQueue(MsgPriority.HIGH).isEmpty());
182 default:
183 log.error("Unexpected QueueState: {}", state);
184 return false;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700185 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800186
Ray Milkey8e5170e2014-04-02 12:09:55 -0700187 return false;
188 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800189
Ray Milkey8e5170e2014-04-02 12:09:55 -0700190 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700191 return rawQueues.get(priority.ordinal());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700192 }
193 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800194
Ray Milkey8e5170e2014-04-02 12:09:55 -0700195 /**
196 * BarrierInfo holds information to specify barrier message sent to switch.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700197 */
Ray Milkey1584ec82014-04-10 11:58:30 -0700198 private static final class BarrierInfo {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700199 final long dpid;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700200 final long xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800201
Ray Milkey8e5170e2014-04-02 12:09:55 -0700202 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
203 return new BarrierInfo(sw.getId(), req.getXid());
204 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800205
Ray Milkey8e5170e2014-04-02 12:09:55 -0700206 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
207 return new BarrierInfo(sw.getId(), rpy.getXid());
208 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800209
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700210 private BarrierInfo(long dpid, long xid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700211 this.dpid = dpid;
212 this.xid = xid;
213 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800214
Ray Milkey8e5170e2014-04-02 12:09:55 -0700215 // Auto generated code by Eclipse
216 @Override
217 public int hashCode() {
218 final int prime = 31;
219 int result = 1;
220 result = prime * result + (int) (dpid ^ (dpid >>> 32));
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700221 result = prime * result + (int) (xid ^ (xid >>> 32));
Ray Milkey8e5170e2014-04-02 12:09:55 -0700222 return result;
223 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800224
Ray Milkey8e5170e2014-04-02 12:09:55 -0700225 @Override
226 public boolean equals(Object obj) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700227 if (this == obj) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700228 return true;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700229 }
230 if (obj == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700231 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700232 }
233 if (getClass() != obj.getClass()) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700234 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700235 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800236
Ray Milkey8e5170e2014-04-02 12:09:55 -0700237 BarrierInfo other = (BarrierInfo) obj;
238 return (this.dpid == other.dpid) && (this.xid == other.xid);
239 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800240
Ray Milkey8e5170e2014-04-02 12:09:55 -0700241 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800242
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700243 private FloodlightModuleContext context = null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700244 private IThreadPoolService threadPool = null;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700245 private IFloodlightProviderService floodlightProvider = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800246
Ray Milkey8e5170e2014-04-02 12:09:55 -0700247 // Map of threads versus dpid
248 private Map<Long, FlowPusherThread> threadMap = null;
249 // Map from (DPID and transaction ID) to Future objects.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700250 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
251 new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800252
Ray Milkey2476cac2014-04-08 11:03:21 -0700253 private int numberThread;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800254
Ray Milkey8e5170e2014-04-02 12:09:55 -0700255 /**
256 * Main thread that reads messages from queues and sends them to switches.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700257 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700258 private static class FlowPusherThread extends Thread {
259 // Weak ConcurrentHashMap
260 private Map<IOFSwitch, SwitchQueue> assignedQueues = CacheBuilder.newBuilder()
261 .weakKeys()
262 .<IOFSwitch, SwitchQueue>build().asMap();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800263
Ray Milkey8e5170e2014-04-02 12:09:55 -0700264 final Lock queuingLock = new ReentrantLock();
265 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800266
Ray Milkey8e5170e2014-04-02 12:09:55 -0700267 @Override
268 public void run() {
269 this.setName("FlowPusherThread " + this.getId());
270 while (true) {
271 while (!queuesHasMessageToSend()) {
272 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800273
Ray Milkey8e5170e2014-04-02 12:09:55 -0700274 try {
275 // wait for message pushed to queue
276 messagePushed.await();
277 } catch (InterruptedException e) {
278 // Interrupted to be shut down (not an error)
279 log.debug("FlowPusherThread is interrupted");
280 return;
281 } finally {
282 queuingLock.unlock();
283 }
284 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800285
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700286 for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues
287 .entrySet().iterator(); it.hasNext();) {
Naoki Shiota47993102014-07-09 14:00:54 -0700288 Entry<IOFSwitch, SwitchQueue> entry = it.next();
289 IOFSwitch sw = entry.getKey();
290 SwitchQueue queue = entry.getValue();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800291
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700292 if (queue == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700293 continue;
294 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800295
Ray Milkey8e5170e2014-04-02 12:09:55 -0700296 synchronized (queue) {
297 processQueue(sw, queue, MAX_MESSAGE_SEND);
298 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
299 // remove queue if flagged to be.
Naoki Shiota47993102014-07-09 14:00:54 -0700300 it.remove();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700301 }
302 }
303 }
304 }
305 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800306
Ray Milkey8e5170e2014-04-02 12:09:55 -0700307 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700308 * Read messages from queue and send them to the switch. If number of
309 * messages excess the limit, stop sending messages.
310 * <p>
311 * @param sw Switch to which messages will be sent.
312 * @param queue Queue of messages.
313 * @param maxMsg Limitation of number of messages to be sent. If set to
314 * 0, all messages in queue will be sent.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700315 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700316 private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700317 // check sending rate and determine it to be sent or not
Ray Milkey2476cac2014-04-08 11:03:21 -0700318 long currentTime = System.currentTimeMillis();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700319 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800320
Naoki Shiota47993102014-07-09 14:00:54 -0700321 if (sw.isConnected() && queue.isSendable(currentTime)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700322 int i = 0;
323 while (queue.hasMessageToSend()) {
324 // Number of messages excess the limit
Ray Milkey9526d6f2014-04-10 14:54:15 -0700325 if (0 < maxMsg && maxMsg <= i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700326 break;
327 }
328 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800329
Ray Milkey8e5170e2014-04-02 12:09:55 -0700330 SwitchQueueEntry queueEntry;
331 synchronized (queue) {
332 queueEntry = queue.poll();
333 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800334
Ray Milkey8e5170e2014-04-02 12:09:55 -0700335 OFMessage msg = queueEntry.getOFMessage();
336 try {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700337 sw.write(msg, null);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700338 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700339 log.trace("Pusher sends message to switch {}: {}", sw.getStringId(), msg);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700340 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700341 // TODO BOC how do we get the size?
342 // size += msg.getLength();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700343 } catch (IOException e) {
Yuta HIGUCHIe933a282014-07-13 21:02:29 -0700344 log.error("Exception in sending message (" + msg + "):", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700345 }
346 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800347
Ray Milkey8e5170e2014-04-02 12:09:55 -0700348 sw.flush();
Ray Milkey2476cac2014-04-08 11:03:21 -0700349 queue.logSentData(currentTime, size);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700350 }
351 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800352
Ray Milkey8e5170e2014-04-02 12:09:55 -0700353 private boolean queuesHasMessageToSend() {
354 for (SwitchQueue queue : assignedQueues.values()) {
355 if (queue.hasMessageToSend()) {
356 return true;
357 }
358 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700359
Ray Milkey8e5170e2014-04-02 12:09:55 -0700360 return false;
361 }
Naoki Shiota05334692014-03-18 16:06:36 -0700362
Ray Milkey8e5170e2014-04-02 12:09:55 -0700363 private void notifyMessagePushed() {
364 queuingLock.lock();
365 try {
366 messagePushed.signal();
367 } finally {
368 queuingLock.unlock();
369 }
370 }
371 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700372
Ray Milkey8e5170e2014-04-02 12:09:55 -0700373 /**
374 * Initialize object with one thread.
375 */
376 public FlowPusher() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700377 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700378 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800379
Ray Milkey8e5170e2014-04-02 12:09:55 -0700380 /**
381 * Initialize object with threads of given number.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700382 * <p>
Ray Milkey9526d6f2014-04-10 14:54:15 -0700383 * @param numberThreadValue Number of threads to handle messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700384 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700385 public FlowPusher(int numberThreadValue) {
386 if (numberThreadValue > 0) {
387 numberThread = numberThreadValue;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700388 } else {
Ray Milkey9526d6f2014-04-10 14:54:15 -0700389 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700390 }
391 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800392
Ray Milkey8e5170e2014-04-02 12:09:55 -0700393 /**
394 * Set parameters needed for sending messages.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700395 * <p>
396 * @param floodlightContext FloodlightModuleContext used for acquiring
397 * ThreadPoolService and registering MessageListener.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700398 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700399 public void init(FloodlightModuleContext floodlightContext) {
400 this.context = floodlightContext;
401 this.floodlightProvider = context
402 .getServiceImpl(IFloodlightProviderService.class);
403 this.threadPool = context.getServiceImpl(IThreadPoolService.class);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800404
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700405 floodlightProvider.addOFMessageListener(OFType.BARRIER_REPLY, this);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700406 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800407
Ray Milkey8e5170e2014-04-02 12:09:55 -0700408 /**
409 * Begin processing queue.
410 */
411 public void start() {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -0700412 threadMap = new HashMap<>();
Ray Milkey2476cac2014-04-08 11:03:21 -0700413 for (long i = 0; i < numberThread; ++i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700414 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800415
Ray Milkey8e5170e2014-04-02 12:09:55 -0700416 threadMap.put(i, thread);
417 thread.start();
418 }
419 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800420
Ray Milkey8e5170e2014-04-02 12:09:55 -0700421 @Override
422 public boolean suspend(IOFSwitch sw) {
423 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700424
Ray Milkey8e5170e2014-04-02 12:09:55 -0700425 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700426 // create queue in case suspend is called before first message
427 // addition
Ray Milkey8e5170e2014-04-02 12:09:55 -0700428 queue = createQueueImpl(sw);
429 }
430
431 synchronized (queue) {
432 if (queue.state == QueueState.READY) {
433 queue.state = QueueState.SUSPENDED;
434 return true;
435 }
436 return false;
437 }
438 }
439
440 @Override
441 public boolean resume(IOFSwitch sw) {
442 SwitchQueue queue = getQueue(sw);
443
444 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700445 log.error("No queue is attached to DPID: {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700446 return false;
447 }
448
449 synchronized (queue) {
450 if (queue.state == QueueState.SUSPENDED) {
451 queue.state = QueueState.READY;
452
453 // Free the latch if queue has any messages
454 FlowPusherThread thread = getProcessingThread(sw);
455 if (queue.hasMessageToSend()) {
456 thread.notifyMessagePushed();
457 }
458 return true;
459 }
460 return false;
461 }
462 }
463
464 @Override
465 public QueueState getState(IOFSwitch sw) {
466 SwitchQueue queue = getQueue(sw);
467
468 if (queue == null) {
469 return QueueState.UNKNOWN;
470 }
471
472 return queue.state;
473 }
474
475 /**
476 * Stop processing queue and exit thread.
477 */
478 public void stop() {
479 if (threadMap == null) {
480 return;
481 }
482
483 for (FlowPusherThread t : threadMap.values()) {
484 t.interrupt();
485 }
486 }
487
488 @Override
489 public void setRate(IOFSwitch sw, long rate) {
490 SwitchQueue queue = getQueue(sw);
491 if (queue == null) {
492 queue = createQueueImpl(sw);
493 }
494
495 if (rate > 0) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700496 log.debug("rate for {} is set to {}", sw.getStringId(), rate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700497 synchronized (queue) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700498 queue.maxRate = rate;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700499 }
500 }
501 }
502
503 @Override
Pavlin Radoslavov9fc535a2014-04-11 13:00:12 -0700504 @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700505 justification = "Future versions of createQueueImpl() might return null")
Ray Milkey8e5170e2014-04-02 12:09:55 -0700506 public boolean createQueue(IOFSwitch sw) {
507 SwitchQueue queue = createQueueImpl(sw);
508
509 return (queue != null);
510 }
511
512 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
513 SwitchQueue queue = getQueue(sw);
514 if (queue != null) {
515 return queue;
516 }
517
518 FlowPusherThread proc = getProcessingThread(sw);
519 queue = new SwitchQueue();
520 queue.state = QueueState.READY;
521 proc.assignedQueues.put(sw, queue);
522
523 return queue;
524 }
525
526 @Override
527 public boolean deleteQueue(IOFSwitch sw) {
528 return deleteQueue(sw, false);
529 }
530
531 @Override
532 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
533 FlowPusherThread proc = getProcessingThread(sw);
534
535 if (forceStop) {
536 SwitchQueue queue = proc.assignedQueues.remove(sw);
537 if (queue == null) {
538 return false;
539 }
540 return true;
541 } else {
542 SwitchQueue queue = getQueue(sw);
543 if (queue == null) {
544 return false;
545 }
546 synchronized (queue) {
547 queue.toBeDeleted = true;
548 }
549 return true;
550 }
551 }
552
553 @Override
554 public boolean add(IOFSwitch sw, OFMessage msg) {
555 return add(sw, msg, MsgPriority.NORMAL);
556 }
557
558 @Override
559 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
560 return addMessageImpl(sw, msg, priority);
561 }
562
563 @Override
564 public void pushFlowEntries(
565 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
566 pushFlowEntries(entries, MsgPriority.NORMAL);
567 }
568
569 @Override
570 public void pushFlowEntries(
571 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
572
573 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -0700574 add(entry.getLeft(), entry.getRight(), priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700575 }
576 }
577
578 @Override
579 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
580 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
581 }
582
583 @Override
584 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -0700585 Collection<Pair<IOFSwitch, FlowEntry>> entries = new LinkedList<>();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700586
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -0700587 entries.add(Pair.of(sw, flowEntry));
Ray Milkey8e5170e2014-04-02 12:09:55 -0700588 pushFlowEntries(entries, priority);
589 }
590
591 /**
Ray Milkeyddcd4922014-04-17 11:21:20 -0700592 * Create a message from FlowEntry and add it to the queue of the switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700593 * <p>
594 * @param sw Switch to which message is pushed.
Ray Milkeyddcd4922014-04-17 11:21:20 -0700595 * @param flowEntry FlowEntry object used for creating message.
596 * @return true if message is successfully added to a queue.
597 */
598 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
599 //
600 // Create the OpenFlow Flow Modification Entry to push
601 //
Jonathan Harta213bce2014-08-11 15:44:07 -0700602 OFFlowMod fm = flowEntry.buildFlowMod(sw.getFactory());
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700603 // log.trace("Pushing flow mod {}", fm);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700604 return addMessageImpl(sw, fm, priority);
605 }
606
607 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700608 * Add message to queue.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700609 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700610 * @param sw
611 * @param msg
Sho SHIMIZUa1199fa2014-06-10 18:11:12 -0700612 * @param priority
Jonathan Hart99ff20a2014-06-15 16:53:00 -0700613 * @return true if the message was added successfully, otherwise false
Ray Milkey8e5170e2014-04-02 12:09:55 -0700614 */
615 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
616 FlowPusherThread thread = getProcessingThread(sw);
617
618 SwitchQueue queue = getQueue(sw);
619
620 // create queue at first addition of message
621 if (queue == null) {
622 queue = createQueueImpl(sw);
623 }
624
625 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
626
627 synchronized (queue) {
628 queue.add(entry, priority);
629 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700630 log.trace("Message is pushed to switch {}: {}", sw.getStringId(), entry.getOFMessage());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700631 }
632 }
633
634 thread.notifyMessagePushed();
635
636 return true;
637 }
638
639 @Override
640 public OFBarrierReply barrier(IOFSwitch sw) {
641 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
642 if (future == null) {
643 return null;
644 }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700645 try {
646 return future.get();
647 } catch (InterruptedException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700648 log.error("InterruptedException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700649 } catch (ExecutionException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700650 log.error("ExecutionException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700651 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700652 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700653 }
654
655 @Override
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700656 public OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700657 // TODO creation of message and future should be moved to OFSwitchImpl
658
659 if (sw == null) {
660 return null;
661 }
662
663 OFBarrierRequest msg = createBarrierRequest(sw);
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700664 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
665 (int) msg.getXid());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700666 barrierFutures.put(BarrierInfo.create(sw, msg), future);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700667 addMessageImpl(sw, msg, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700668 return future;
669 }
670
671 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
Jonathan Harta213bce2014-08-11 15:44:07 -0700672 OFFactory factory = sw.getFactory();
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700673 if (factory == null) {
674 log.error("No OF Message Factory for switch {} with OFVersion {}", sw,
675 sw.getOFVersion());
676 return null;
677 }
678 return factory.buildBarrierRequest()
679 .setXid(sw.getNextTransactionId())
680 .build();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700681 }
682
683 /**
684 * Get a queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700685 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700686 * @param sw Switch object
687 * @return Queue object
688 */
689 protected SwitchQueue getQueue(IOFSwitch sw) {
690 if (sw == null) {
691 return null;
692 }
693
694 FlowPusherThread th = getProcessingThread(sw);
695 if (th == null) {
696 return null;
697 }
698
699 return th.assignedQueues.get(sw);
700 }
701
702 /**
703 * Get a hash value correspondent to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700704 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700705 * @param sw Switch object
706 * @return Hash value
707 */
708 protected long getHash(IOFSwitch sw) {
709 // This code assumes DPID is sequentially assigned.
710 // TODO consider equalization algorithm
Ray Milkey2476cac2014-04-08 11:03:21 -0700711 return sw.getId() % numberThread;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700712 }
713
714 /**
715 * Get a Thread object which processes the queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700716 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700717 * @param sw Switch object
718 * @return Thread object
719 */
720 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
721 long hash = getHash(sw);
722
723 return threadMap.get(hash);
724 }
725
726 @Override
727 public String getName() {
728 return "flowpusher";
729 }
730
731 @Override
732 public boolean isCallbackOrderingPrereq(OFType type, String name) {
733 return false;
734 }
735
736 @Override
737 public boolean isCallbackOrderingPostreq(OFType type, String name) {
738 return false;
739 }
740
741 @Override
742 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
743 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700744 log.trace("Received BARRIER_REPLY from : {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700745 }
746
Pavlin Radoslavov0b88a262014-04-10 15:43:27 -0700747 if ((msg.getType() != OFType.BARRIER_REPLY) ||
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700748 !(msg instanceof OFBarrierReply)) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700749 log.error("Unexpected reply message: {}", msg.getType());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700750 return Command.CONTINUE;
751 }
752
753 OFBarrierReply reply = (OFBarrierReply) msg;
754 BarrierInfo info = BarrierInfo.create(sw, reply);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700755 // Deliver future if exists
756 OFBarrierReplyFuture future = barrierFutures.get(info);
757 if (future != null) {
758 future.deliverFuture(sw, msg);
759 barrierFutures.remove(info);
760 }
761
762 return Command.CONTINUE;
763 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700764
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700765}