blob: 17bbc1a46745f9be3442bc121913d0106df3c76a [file] [log] [blame]
Jonathan Hart23701d12014-04-03 10:45:48 -07001package net.onrc.onos.core.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota47993102014-07-09 14:00:54 -07008import java.util.Iterator;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08009import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080010import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070011import java.util.Map;
Naoki Shiota47993102014-07-09 14:00:54 -070012import java.util.Map.Entry;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070014import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080015import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070016import java.util.concurrent.locks.Condition;
17import java.util.concurrent.locks.Lock;
18import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080027import net.floodlightcontroller.util.OFMessageDamper;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070028import net.onrc.onos.core.intent.FlowEntry;
Jonathan Hart23701d12014-04-03 10:45:48 -070029import net.onrc.onos.core.util.Pair;
Jonathan Harta99ec672014-04-03 11:30:34 -070030
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070031import org.projectfloodlight.openflow.protocol.OFBarrierReply;
32import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
33import org.projectfloodlight.openflow.protocol.OFFactory;
34import org.projectfloodlight.openflow.protocol.OFFlowMod;
35import org.projectfloodlight.openflow.protocol.OFMessage;
36import org.projectfloodlight.openflow.protocol.OFType;
37import org.projectfloodlight.openflow.protocol.OFVersion;
Jonathan Harta99ec672014-04-03 11:30:34 -070038import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070040
Yuta HIGUCHIafadeda2014-07-24 17:11:07 -070041import com.google.common.cache.CacheBuilder;
42
Jonathan Hartc00f5c22014-06-10 15:14:40 -070043import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
44
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070045/**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070046 * FlowPusher is a implementation of FlowPusherService. FlowPusher assigns one
47 * message queue instance for each one switch. Number of message processing
48 * threads is configurable by constructor, and one thread can handle multiple
49 * message queues. Each queue will be assigned to a thread according to hash
50 * function defined by getHash(). Each processing thread reads messages from
51 * queues and sends it to switches in round-robin. Processing thread also
52 * calculates rate of sending to suppress excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070053 */
Ray Milkey1584ec82014-04-10 11:58:30 -070054public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
Ray Milkeyec838942014-04-09 11:28:43 -070055 private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070056 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080057
Naoki Shiota7d0cf272013-11-05 10:18:12 -080058 // TODO: Values copied from elsewhere (class LearningSwitch).
59 // The local copy should go away!
60 //
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070061 protected static final int OFMESSAGE_DAMPER_CAPACITY = 10000; // TODO: find
62 // sweet spot
63 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Ray Milkey8e5170e2014-04-02 12:09:55 -070064
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080065 // Number of messages sent to switch at once
66 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080067
Ray Milkey8e5170e2014-04-02 12:09:55 -070068 private static class SwitchQueueEntry {
69 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070070
Ray Milkey8e5170e2014-04-02 12:09:55 -070071 public SwitchQueueEntry(OFMessage msg) {
72 this.msg = msg;
73 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070074
Ray Milkey8e5170e2014-04-02 12:09:55 -070075 public OFMessage getOFMessage() {
76 return msg;
77 }
78 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -070079
Ray Milkey8e5170e2014-04-02 12:09:55 -070080 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070081 * SwitchQueue represents message queue attached to a switch. This consists
82 * of queue itself and variables used for limiting sending rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -070083 */
Pavlin Radoslavovfee80982014-04-10 12:12:04 -070084 private static class SwitchQueue {
Ray Milkey2476cac2014-04-08 11:03:21 -070085 List<Queue<SwitchQueueEntry>> rawQueues;
Ray Milkey8e5170e2014-04-02 12:09:55 -070086 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080087
Ray Milkey8e5170e2014-04-02 12:09:55 -070088 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070089 long maxRate = 0; // 0 indicates no limitation
Ray Milkey2476cac2014-04-08 11:03:21 -070090 long lastSentTime = 0;
91 long lastSentSize = 0;
Naoki Shiota05334692014-03-18 16:06:36 -070092
Ray Milkey8e5170e2014-04-02 12:09:55 -070093 // "To be deleted" flag
94 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080095
Ray Milkey8e5170e2014-04-02 12:09:55 -070096 SwitchQueue() {
Ray Milkey2476cac2014-04-08 11:03:21 -070097 rawQueues = new ArrayList<Queue<SwitchQueueEntry>>(
Ray Milkey8e5170e2014-04-02 12:09:55 -070098 MsgPriority.values().length);
99 for (int i = 0; i < MsgPriority.values().length; ++i) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700100 rawQueues.add(i, new ArrayDeque<SwitchQueueEntry>());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700101 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800102
Ray Milkey8e5170e2014-04-02 12:09:55 -0700103 state = QueueState.READY;
104 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800105
Ray Milkey8e5170e2014-04-02 12:09:55 -0700106 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700107 * Check if sending rate is within the rate.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700108 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700109 * @param current Current time
110 * @return true if within the rate
111 */
112 boolean isSendable(long current) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700113 if (maxRate == 0) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700114 // no limitation
115 return true;
116 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800117
Ray Milkey2476cac2014-04-08 11:03:21 -0700118 if (current == lastSentTime) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700119 return false;
120 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800121
Ray Milkey8e5170e2014-04-02 12:09:55 -0700122 // Check if sufficient time (from aspect of rate) elapsed or not.
Ray Milkey2476cac2014-04-08 11:03:21 -0700123 long rate = lastSentSize / (current - lastSentTime);
124 return (rate < maxRate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700125 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800126
Ray Milkey8e5170e2014-04-02 12:09:55 -0700127 /**
128 * Log time and size of last sent data.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700129 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700130 * @param current Time to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700131 * @param size Size of sent data (in bytes).
Ray Milkey8e5170e2014-04-02 12:09:55 -0700132 */
133 void logSentData(long current, long size) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700134 lastSentTime = current;
135 lastSentSize = size;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700136 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700137
Ray Milkey8e5170e2014-04-02 12:09:55 -0700138 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
139 Queue<SwitchQueueEntry> queue = getQueue(priority);
140 if (queue == null) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700141 log.error("Unexpected priority: {}", priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700142 return false;
143 }
144 return queue.add(entry);
145 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800146
Ray Milkey8e5170e2014-04-02 12:09:55 -0700147 /**
148 * Poll single appropriate entry object according to QueueState.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700149 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700150 * @return Entry object.
151 */
152 SwitchQueueEntry poll() {
153 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700154 case READY: {
155 for (int i = 0; i < rawQueues.size(); ++i) {
156 SwitchQueueEntry entry = rawQueues.get(i).poll();
157 if (entry != null) {
158 return entry;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700159 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700160 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800161
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700162 return null;
163 }
164 case SUSPENDED: {
165 // Only polling from high priority queue
166 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
167 return entry;
168 }
169 default:
170 log.error("Unexpected QueueState: {}", state);
171 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700172 }
173 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800174
Ray Milkey8e5170e2014-04-02 12:09:55 -0700175 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700176 * Check if this object has any messages in the queues to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700177 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700178 * @return True if there are some messages to be sent.
179 */
180 boolean hasMessageToSend() {
181 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700182 case READY:
183 for (Queue<SwitchQueueEntry> queue : rawQueues) {
184 if (!queue.isEmpty()) {
185 return true;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700186 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700187 }
188 break;
189 case SUSPENDED:
190 // Only checking high priority queue
191 return (!getQueue(MsgPriority.HIGH).isEmpty());
192 default:
193 log.error("Unexpected QueueState: {}", state);
194 return false;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700195 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800196
Ray Milkey8e5170e2014-04-02 12:09:55 -0700197 return false;
198 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800199
Ray Milkey8e5170e2014-04-02 12:09:55 -0700200 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700201 return rawQueues.get(priority.ordinal());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700202 }
203 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800204
Ray Milkey8e5170e2014-04-02 12:09:55 -0700205 /**
206 * BarrierInfo holds information to specify barrier message sent to switch.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700207 */
Ray Milkey1584ec82014-04-10 11:58:30 -0700208 private static final class BarrierInfo {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700209 final long dpid;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700210 final long xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800211
Ray Milkey8e5170e2014-04-02 12:09:55 -0700212 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
213 return new BarrierInfo(sw.getId(), req.getXid());
214 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800215
Ray Milkey8e5170e2014-04-02 12:09:55 -0700216 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
217 return new BarrierInfo(sw.getId(), rpy.getXid());
218 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800219
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700220 private BarrierInfo(long dpid, long xid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700221 this.dpid = dpid;
222 this.xid = xid;
223 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800224
Ray Milkey8e5170e2014-04-02 12:09:55 -0700225 // Auto generated code by Eclipse
226 @Override
227 public int hashCode() {
228 final int prime = 31;
229 int result = 1;
230 result = prime * result + (int) (dpid ^ (dpid >>> 32));
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700231 result = prime * result + (int) (xid ^ (xid >>> 32));
Ray Milkey8e5170e2014-04-02 12:09:55 -0700232 return result;
233 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800234
Ray Milkey8e5170e2014-04-02 12:09:55 -0700235 @Override
236 public boolean equals(Object obj) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700237 if (this == obj) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700238 return true;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700239 }
240 if (obj == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700241 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700242 }
243 if (getClass() != obj.getClass()) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700244 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700245 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800246
Ray Milkey8e5170e2014-04-02 12:09:55 -0700247 BarrierInfo other = (BarrierInfo) obj;
248 return (this.dpid == other.dpid) && (this.xid == other.xid);
249 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800250
Ray Milkey8e5170e2014-04-02 12:09:55 -0700251 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800252
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700253 private FloodlightModuleContext context = null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700254 private OFMessageDamper messageDamper = null;
255 private IThreadPoolService threadPool = null;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700256 private IFloodlightProviderService floodlightProvider = null;
257 protected Map<OFVersion, OFFactory> ofFactoryMap = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800258
Ray Milkey8e5170e2014-04-02 12:09:55 -0700259 // Map of threads versus dpid
260 private Map<Long, FlowPusherThread> threadMap = null;
261 // Map from (DPID and transaction ID) to Future objects.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700262 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
263 new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800264
Ray Milkey2476cac2014-04-08 11:03:21 -0700265 private int numberThread;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800266
Ray Milkey8e5170e2014-04-02 12:09:55 -0700267 /**
268 * Main thread that reads messages from queues and sends them to switches.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700269 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700270 private static class FlowPusherThread extends Thread {
271 // Weak ConcurrentHashMap
272 private Map<IOFSwitch, SwitchQueue> assignedQueues = CacheBuilder.newBuilder()
273 .weakKeys()
274 .<IOFSwitch, SwitchQueue>build().asMap();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800275
Ray Milkey8e5170e2014-04-02 12:09:55 -0700276 final Lock queuingLock = new ReentrantLock();
277 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800278
Ray Milkey8e5170e2014-04-02 12:09:55 -0700279 @Override
280 public void run() {
281 this.setName("FlowPusherThread " + this.getId());
282 while (true) {
283 while (!queuesHasMessageToSend()) {
284 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800285
Ray Milkey8e5170e2014-04-02 12:09:55 -0700286 try {
287 // wait for message pushed to queue
288 messagePushed.await();
289 } catch (InterruptedException e) {
290 // Interrupted to be shut down (not an error)
291 log.debug("FlowPusherThread is interrupted");
292 return;
293 } finally {
294 queuingLock.unlock();
295 }
296 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800297
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700298 for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues
299 .entrySet().iterator(); it.hasNext();) {
Naoki Shiota47993102014-07-09 14:00:54 -0700300 Entry<IOFSwitch, SwitchQueue> entry = it.next();
301 IOFSwitch sw = entry.getKey();
302 SwitchQueue queue = entry.getValue();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800303
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700304 if (queue == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700305 continue;
306 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800307
Ray Milkey8e5170e2014-04-02 12:09:55 -0700308 synchronized (queue) {
309 processQueue(sw, queue, MAX_MESSAGE_SEND);
310 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
311 // remove queue if flagged to be.
Naoki Shiota47993102014-07-09 14:00:54 -0700312 it.remove();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700313 }
314 }
315 }
316 }
317 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800318
Ray Milkey8e5170e2014-04-02 12:09:55 -0700319 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700320 * Read messages from queue and send them to the switch. If number of
321 * messages excess the limit, stop sending messages.
322 * <p>
323 * @param sw Switch to which messages will be sent.
324 * @param queue Queue of messages.
325 * @param maxMsg Limitation of number of messages to be sent. If set to
326 * 0, all messages in queue will be sent.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700327 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700328 private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700329 // check sending rate and determine it to be sent or not
Ray Milkey2476cac2014-04-08 11:03:21 -0700330 long currentTime = System.currentTimeMillis();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700331 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800332
Naoki Shiota47993102014-07-09 14:00:54 -0700333 if (sw.isConnected() && queue.isSendable(currentTime)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700334 int i = 0;
335 while (queue.hasMessageToSend()) {
336 // Number of messages excess the limit
Ray Milkey9526d6f2014-04-10 14:54:15 -0700337 if (0 < maxMsg && maxMsg <= i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700338 break;
339 }
340 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800341
Ray Milkey8e5170e2014-04-02 12:09:55 -0700342 SwitchQueueEntry queueEntry;
343 synchronized (queue) {
344 queueEntry = queue.poll();
345 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800346
Ray Milkey8e5170e2014-04-02 12:09:55 -0700347 OFMessage msg = queueEntry.getOFMessage();
348 try {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700349 // TODO BOC do we need to use the message damper?
350 // messageDamper.write(sw, msg, context);
351 sw.write(msg, null);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700352 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700353 log.trace("Pusher sends message to switch {}: {}", sw.getStringId(), msg);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700354 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700355 // TODO BOC how do we get the size?
356 // size += msg.getLength();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700357 } catch (IOException e) {
Yuta HIGUCHIe933a282014-07-13 21:02:29 -0700358 log.error("Exception in sending message (" + msg + "):", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700359 }
360 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800361
Ray Milkey8e5170e2014-04-02 12:09:55 -0700362 sw.flush();
Ray Milkey2476cac2014-04-08 11:03:21 -0700363 queue.logSentData(currentTime, size);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700364 }
365 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800366
Ray Milkey8e5170e2014-04-02 12:09:55 -0700367 private boolean queuesHasMessageToSend() {
368 for (SwitchQueue queue : assignedQueues.values()) {
369 if (queue.hasMessageToSend()) {
370 return true;
371 }
372 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700373
Ray Milkey8e5170e2014-04-02 12:09:55 -0700374 return false;
375 }
Naoki Shiota05334692014-03-18 16:06:36 -0700376
Ray Milkey8e5170e2014-04-02 12:09:55 -0700377 private void notifyMessagePushed() {
378 queuingLock.lock();
379 try {
380 messagePushed.signal();
381 } finally {
382 queuingLock.unlock();
383 }
384 }
385 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700386
Ray Milkey8e5170e2014-04-02 12:09:55 -0700387 /**
388 * Initialize object with one thread.
389 */
390 public FlowPusher() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700391 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700392 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800393
Ray Milkey8e5170e2014-04-02 12:09:55 -0700394 /**
395 * Initialize object with threads of given number.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700396 * <p>
Ray Milkey9526d6f2014-04-10 14:54:15 -0700397 * @param numberThreadValue Number of threads to handle messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700398 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700399 public FlowPusher(int numberThreadValue) {
400 if (numberThreadValue > 0) {
401 numberThread = numberThreadValue;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700402 } else {
Ray Milkey9526d6f2014-04-10 14:54:15 -0700403 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700404 }
405 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800406
Ray Milkey8e5170e2014-04-02 12:09:55 -0700407 /**
408 * Set parameters needed for sending messages.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700409 * <p>
410 * @param floodlightContext FloodlightModuleContext used for acquiring
411 * ThreadPoolService and registering MessageListener.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700412 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700413 public void init(FloodlightModuleContext floodlightContext) {
414 this.context = floodlightContext;
415 this.floodlightProvider = context
416 .getServiceImpl(IFloodlightProviderService.class);
417 this.threadPool = context.getServiceImpl(IThreadPoolService.class);
418 this.messageDamper = null;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800419
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700420 ofFactoryMap = new HashMap<>();
421 ofFactoryMap.put(OFVersion.OF_10, floodlightProvider.getOFMessageFactory_10());
422 ofFactoryMap.put(OFVersion.OF_13, floodlightProvider.getOFMessageFactory_13());
423 floodlightProvider.addOFMessageListener(OFType.BARRIER_REPLY, this);
424
425 // TODO BOC message damper may not be needed...
426 // if (damper != null) {
427 // messageDamper = damper;
428 // } else {
429 // use default values
430 /*messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
431 EnumSet.of(OFType.FLOW_MOD),
432 OFMESSAGE_DAMPER_TIMEOUT);*/
433 // }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700434 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800435
Ray Milkey8e5170e2014-04-02 12:09:55 -0700436 /**
437 * Begin processing queue.
438 */
439 public void start() {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700440 // TODO BOC
441 // if (factory == null) {
442 // log.error("FlowPusher not yet initialized.");
443 // return;
444 // }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800445
Ray Milkey8e5170e2014-04-02 12:09:55 -0700446 threadMap = new HashMap<Long, FlowPusherThread>();
Ray Milkey2476cac2014-04-08 11:03:21 -0700447 for (long i = 0; i < numberThread; ++i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700448 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800449
Ray Milkey8e5170e2014-04-02 12:09:55 -0700450 threadMap.put(i, thread);
451 thread.start();
452 }
453 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800454
Ray Milkey8e5170e2014-04-02 12:09:55 -0700455 @Override
456 public boolean suspend(IOFSwitch sw) {
457 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700458
Ray Milkey8e5170e2014-04-02 12:09:55 -0700459 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700460 // create queue in case suspend is called before first message
461 // addition
Ray Milkey8e5170e2014-04-02 12:09:55 -0700462 queue = createQueueImpl(sw);
463 }
464
465 synchronized (queue) {
466 if (queue.state == QueueState.READY) {
467 queue.state = QueueState.SUSPENDED;
468 return true;
469 }
470 return false;
471 }
472 }
473
474 @Override
475 public boolean resume(IOFSwitch sw) {
476 SwitchQueue queue = getQueue(sw);
477
478 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700479 log.error("No queue is attached to DPID: {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700480 return false;
481 }
482
483 synchronized (queue) {
484 if (queue.state == QueueState.SUSPENDED) {
485 queue.state = QueueState.READY;
486
487 // Free the latch if queue has any messages
488 FlowPusherThread thread = getProcessingThread(sw);
489 if (queue.hasMessageToSend()) {
490 thread.notifyMessagePushed();
491 }
492 return true;
493 }
494 return false;
495 }
496 }
497
498 @Override
499 public QueueState getState(IOFSwitch sw) {
500 SwitchQueue queue = getQueue(sw);
501
502 if (queue == null) {
503 return QueueState.UNKNOWN;
504 }
505
506 return queue.state;
507 }
508
509 /**
510 * Stop processing queue and exit thread.
511 */
512 public void stop() {
513 if (threadMap == null) {
514 return;
515 }
516
517 for (FlowPusherThread t : threadMap.values()) {
518 t.interrupt();
519 }
520 }
521
522 @Override
523 public void setRate(IOFSwitch sw, long rate) {
524 SwitchQueue queue = getQueue(sw);
525 if (queue == null) {
526 queue = createQueueImpl(sw);
527 }
528
529 if (rate > 0) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700530 log.debug("rate for {} is set to {}", sw.getStringId(), rate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700531 synchronized (queue) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700532 queue.maxRate = rate;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700533 }
534 }
535 }
536
537 @Override
Pavlin Radoslavov9fc535a2014-04-11 13:00:12 -0700538 @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700539 justification = "Future versions of createQueueImpl() might return null")
Ray Milkey8e5170e2014-04-02 12:09:55 -0700540 public boolean createQueue(IOFSwitch sw) {
541 SwitchQueue queue = createQueueImpl(sw);
542
543 return (queue != null);
544 }
545
546 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
547 SwitchQueue queue = getQueue(sw);
548 if (queue != null) {
549 return queue;
550 }
551
552 FlowPusherThread proc = getProcessingThread(sw);
553 queue = new SwitchQueue();
554 queue.state = QueueState.READY;
555 proc.assignedQueues.put(sw, queue);
556
557 return queue;
558 }
559
560 @Override
561 public boolean deleteQueue(IOFSwitch sw) {
562 return deleteQueue(sw, false);
563 }
564
565 @Override
566 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
567 FlowPusherThread proc = getProcessingThread(sw);
568
569 if (forceStop) {
570 SwitchQueue queue = proc.assignedQueues.remove(sw);
571 if (queue == null) {
572 return false;
573 }
574 return true;
575 } else {
576 SwitchQueue queue = getQueue(sw);
577 if (queue == null) {
578 return false;
579 }
580 synchronized (queue) {
581 queue.toBeDeleted = true;
582 }
583 return true;
584 }
585 }
586
Yuta HIGUCHIafadeda2014-07-24 17:11:07 -0700587 /**
588 * Invalidate.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700589 * <p>
Yuta HIGUCHIafadeda2014-07-24 17:11:07 -0700590 * @param sw switch
Yuta HIGUCHIafadeda2014-07-24 17:11:07 -0700591 * @see OFMessageDamper#invalidate(IOFSwitch)
592 */
593 public void invalidate(IOFSwitch sw) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700594 // messageDamper.invalidate(sw); currently a null ptr - commenting out
Yuta HIGUCHIafadeda2014-07-24 17:11:07 -0700595 }
596
Ray Milkey8e5170e2014-04-02 12:09:55 -0700597 @Override
598 public boolean add(IOFSwitch sw, OFMessage msg) {
599 return add(sw, msg, MsgPriority.NORMAL);
600 }
601
602 @Override
603 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
604 return addMessageImpl(sw, msg, priority);
605 }
606
607 @Override
608 public void pushFlowEntries(
609 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
610 pushFlowEntries(entries, MsgPriority.NORMAL);
611 }
612
613 @Override
614 public void pushFlowEntries(
615 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
616
617 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Sho SHIMIZU26d77892014-06-10 11:07:06 -0700618 add(entry.getFirst(), entry.getSecond(), priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700619 }
620 }
621
622 @Override
623 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
624 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
625 }
626
627 @Override
628 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
629 Collection<Pair<IOFSwitch, FlowEntry>> entries =
630 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
631
632 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
633 pushFlowEntries(entries, priority);
634 }
635
636 /**
Ray Milkeyddcd4922014-04-17 11:21:20 -0700637 * Create a message from FlowEntry and add it to the queue of the switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700638 * <p>
639 * @param sw Switch to which message is pushed.
Ray Milkeyddcd4922014-04-17 11:21:20 -0700640 * @param flowEntry FlowEntry object used for creating message.
641 * @return true if message is successfully added to a queue.
642 */
643 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
644 //
645 // Create the OpenFlow Flow Modification Entry to push
646 //
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700647 OFFlowMod fm = flowEntry.buildFlowMod(ofFactoryMap.get(sw.getOFVersion()));
648 // log.trace("Pushing flow mod {}", fm);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700649 return addMessageImpl(sw, fm, priority);
650 }
651
652 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700653 * Add message to queue.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700654 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700655 * @param sw
656 * @param msg
Sho SHIMIZUa1199fa2014-06-10 18:11:12 -0700657 * @param priority
Jonathan Hart99ff20a2014-06-15 16:53:00 -0700658 * @return true if the message was added successfully, otherwise false
Ray Milkey8e5170e2014-04-02 12:09:55 -0700659 */
660 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
661 FlowPusherThread thread = getProcessingThread(sw);
662
663 SwitchQueue queue = getQueue(sw);
664
665 // create queue at first addition of message
666 if (queue == null) {
667 queue = createQueueImpl(sw);
668 }
669
670 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
671
672 synchronized (queue) {
673 queue.add(entry, priority);
674 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700675 log.trace("Message is pushed to switch {}: {}", sw.getStringId(), entry.getOFMessage());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700676 }
677 }
678
679 thread.notifyMessagePushed();
680
681 return true;
682 }
683
684 @Override
685 public OFBarrierReply barrier(IOFSwitch sw) {
686 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
687 if (future == null) {
688 return null;
689 }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700690 try {
691 return future.get();
692 } catch (InterruptedException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700693 log.error("InterruptedException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700694 } catch (ExecutionException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700695 log.error("ExecutionException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700696 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700697 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700698 }
699
700 @Override
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700701 public OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700702 // TODO creation of message and future should be moved to OFSwitchImpl
703
704 if (sw == null) {
705 return null;
706 }
707
708 OFBarrierRequest msg = createBarrierRequest(sw);
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700709 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
710 (int) msg.getXid());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700711 barrierFutures.put(BarrierInfo.create(sw, msg), future);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700712 addMessageImpl(sw, msg, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700713 return future;
714 }
715
716 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700717 OFFactory factory = ofFactoryMap.get(sw.getOFVersion());
718 if (factory == null) {
719 log.error("No OF Message Factory for switch {} with OFVersion {}", sw,
720 sw.getOFVersion());
721 return null;
722 }
723 return factory.buildBarrierRequest()
724 .setXid(sw.getNextTransactionId())
725 .build();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700726 }
727
728 /**
729 * Get a queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700730 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700731 * @param sw Switch object
732 * @return Queue object
733 */
734 protected SwitchQueue getQueue(IOFSwitch sw) {
735 if (sw == null) {
736 return null;
737 }
738
739 FlowPusherThread th = getProcessingThread(sw);
740 if (th == null) {
741 return null;
742 }
743
744 return th.assignedQueues.get(sw);
745 }
746
747 /**
748 * Get a hash value correspondent to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700749 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700750 * @param sw Switch object
751 * @return Hash value
752 */
753 protected long getHash(IOFSwitch sw) {
754 // This code assumes DPID is sequentially assigned.
755 // TODO consider equalization algorithm
Ray Milkey2476cac2014-04-08 11:03:21 -0700756 return sw.getId() % numberThread;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700757 }
758
759 /**
760 * Get a Thread object which processes the queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700761 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700762 * @param sw Switch object
763 * @return Thread object
764 */
765 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
766 long hash = getHash(sw);
767
768 return threadMap.get(hash);
769 }
770
771 @Override
772 public String getName() {
773 return "flowpusher";
774 }
775
776 @Override
777 public boolean isCallbackOrderingPrereq(OFType type, String name) {
778 return false;
779 }
780
781 @Override
782 public boolean isCallbackOrderingPostreq(OFType type, String name) {
783 return false;
784 }
785
786 @Override
787 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
788 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700789 log.trace("Received BARRIER_REPLY from : {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700790 }
791
Pavlin Radoslavov0b88a262014-04-10 15:43:27 -0700792 if ((msg.getType() != OFType.BARRIER_REPLY) ||
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700793 !(msg instanceof OFBarrierReply)) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700794 log.error("Unexpected reply message: {}", msg.getType());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700795 return Command.CONTINUE;
796 }
797
798 OFBarrierReply reply = (OFBarrierReply) msg;
799 BarrierInfo info = BarrierInfo.create(sw, reply);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700800 // Deliver future if exists
801 OFBarrierReplyFuture future = barrierFutures.get(info);
802 if (future != null) {
803 future.deliverFuture(sw, msg);
804 barrierFutures.remove(info);
805 }
806
807 return Command.CONTINUE;
808 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700809
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700810}