blob: aa1c05704d348522df11e8f7eaa84bfa1fa96483 [file] [log] [blame]
Jonathan Hart23701d12014-04-03 10:45:48 -07001package net.onrc.onos.core.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota47993102014-07-09 14:00:54 -07008import java.util.Iterator;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08009import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080010import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070011import java.util.Map;
Naoki Shiota47993102014-07-09 14:00:54 -070012import java.util.Map.Entry;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070014import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080015import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070016import java.util.concurrent.locks.Condition;
17import java.util.concurrent.locks.Lock;
18import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070027import net.onrc.onos.core.intent.FlowEntry;
Jonathan Hart5302b6c2014-08-13 15:57:59 -070028import net.onrc.onos.core.util.Dpid;
Jonathan Harta99ec672014-04-03 11:30:34 -070029
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -070030import org.apache.commons.lang3.tuple.Pair;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070031import org.projectfloodlight.openflow.protocol.OFBarrierReply;
32import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
33import org.projectfloodlight.openflow.protocol.OFFactory;
34import org.projectfloodlight.openflow.protocol.OFFlowMod;
35import org.projectfloodlight.openflow.protocol.OFMessage;
36import org.projectfloodlight.openflow.protocol.OFType;
Jonathan Harta99ec672014-04-03 11:30:34 -070037import org.slf4j.Logger;
38import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070039
Jonathan Hartc00f5c22014-06-10 15:14:40 -070040import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
41
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070042/**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070043 * FlowPusher is a implementation of FlowPusherService. FlowPusher assigns one
44 * message queue instance for each one switch. Number of message processing
45 * threads is configurable by constructor, and one thread can handle multiple
46 * message queues. Each queue will be assigned to a thread according to hash
47 * function defined by getHash(). Each processing thread reads messages from
48 * queues and sends it to switches in round-robin. Processing thread also
49 * calculates rate of sending to suppress excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070050 */
Ray Milkey1584ec82014-04-10 11:58:30 -070051public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
Ray Milkeyec838942014-04-09 11:28:43 -070052 private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
Jonathan Hart5302b6c2014-08-13 15:57:59 -070053 private static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080054
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080055 // Number of messages sent to switch at once
56 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080057
Jonathan Hart5302b6c2014-08-13 15:57:59 -070058 private FloodlightModuleContext context = null;
59 private IThreadPoolService threadPool = null;
60 private IFloodlightProviderService floodlightProvider = null;
61
62 // Map of threads versus dpid
63 private Map<Long, FlowPusherThread> threadMap = null;
64 // Map from (DPID and transaction ID) to Future objects.
65 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
66 new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
67
68 private int numberThread;
69
Ray Milkey8e5170e2014-04-02 12:09:55 -070070 private static class SwitchQueueEntry {
71 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070072
Ray Milkey8e5170e2014-04-02 12:09:55 -070073 public SwitchQueueEntry(OFMessage msg) {
74 this.msg = msg;
75 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070076
Ray Milkey8e5170e2014-04-02 12:09:55 -070077 public OFMessage getOFMessage() {
78 return msg;
79 }
80 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -070081
Ray Milkey8e5170e2014-04-02 12:09:55 -070082 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070083 * SwitchQueue represents message queue attached to a switch. This consists
84 * of queue itself and variables used for limiting sending rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -070085 */
Pavlin Radoslavovfee80982014-04-10 12:12:04 -070086 private static class SwitchQueue {
Ray Milkey2476cac2014-04-08 11:03:21 -070087 List<Queue<SwitchQueueEntry>> rawQueues;
Ray Milkey8e5170e2014-04-02 12:09:55 -070088 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080089
Ray Milkey8e5170e2014-04-02 12:09:55 -070090 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070091 long maxRate = 0; // 0 indicates no limitation
Ray Milkey2476cac2014-04-08 11:03:21 -070092 long lastSentTime = 0;
93 long lastSentSize = 0;
Naoki Shiota05334692014-03-18 16:06:36 -070094
Ray Milkey8e5170e2014-04-02 12:09:55 -070095 // "To be deleted" flag
96 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080097
Ray Milkey8e5170e2014-04-02 12:09:55 -070098 SwitchQueue() {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -070099 rawQueues = new ArrayList<>(MsgPriority.values().length);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700100 for (int i = 0; i < MsgPriority.values().length; ++i) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700101 rawQueues.add(i, new ArrayDeque<SwitchQueueEntry>());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700102 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800103
Ray Milkey8e5170e2014-04-02 12:09:55 -0700104 state = QueueState.READY;
105 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800106
Ray Milkey8e5170e2014-04-02 12:09:55 -0700107 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700108 * Check if sending rate is within the rate.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700109 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700110 * @param current Current time
111 * @return true if within the rate
112 */
113 boolean isSendable(long current) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700114 if (maxRate == 0) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700115 // no limitation
116 return true;
117 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800118
Ray Milkey2476cac2014-04-08 11:03:21 -0700119 if (current == lastSentTime) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700120 return false;
121 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800122
Ray Milkey8e5170e2014-04-02 12:09:55 -0700123 // Check if sufficient time (from aspect of rate) elapsed or not.
Ray Milkey2476cac2014-04-08 11:03:21 -0700124 long rate = lastSentSize / (current - lastSentTime);
125 return (rate < maxRate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700126 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800127
Ray Milkey8e5170e2014-04-02 12:09:55 -0700128 /**
129 * Log time and size of last sent data.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700130 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700131 * @param current Time to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700132 * @param size Size of sent data (in bytes).
Ray Milkey8e5170e2014-04-02 12:09:55 -0700133 */
134 void logSentData(long current, long size) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700135 lastSentTime = current;
136 lastSentSize = size;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700137 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700138
Ray Milkey8e5170e2014-04-02 12:09:55 -0700139 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
140 Queue<SwitchQueueEntry> queue = getQueue(priority);
141 if (queue == null) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700142 log.error("Unexpected priority: {}", priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700143 return false;
144 }
145 return queue.add(entry);
146 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800147
Ray Milkey8e5170e2014-04-02 12:09:55 -0700148 /**
149 * Poll single appropriate entry object according to QueueState.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700150 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700151 * @return Entry object.
152 */
153 SwitchQueueEntry poll() {
154 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700155 case READY: {
156 for (int i = 0; i < rawQueues.size(); ++i) {
157 SwitchQueueEntry entry = rawQueues.get(i).poll();
158 if (entry != null) {
159 return entry;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700160 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700161 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800162
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700163 return null;
164 }
165 case SUSPENDED: {
166 // Only polling from high priority queue
167 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
168 return entry;
169 }
170 default:
171 log.error("Unexpected QueueState: {}", state);
172 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700173 }
174 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800175
Ray Milkey8e5170e2014-04-02 12:09:55 -0700176 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700177 * Check if this object has any messages in the queues to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700178 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700179 * @return True if there are some messages to be sent.
180 */
181 boolean hasMessageToSend() {
182 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700183 case READY:
184 for (Queue<SwitchQueueEntry> queue : rawQueues) {
185 if (!queue.isEmpty()) {
186 return true;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700187 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700188 }
189 break;
190 case SUSPENDED:
191 // Only checking high priority queue
192 return (!getQueue(MsgPriority.HIGH).isEmpty());
193 default:
194 log.error("Unexpected QueueState: {}", state);
195 return false;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700196 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800197
Ray Milkey8e5170e2014-04-02 12:09:55 -0700198 return false;
199 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800200
Ray Milkey8e5170e2014-04-02 12:09:55 -0700201 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700202 return rawQueues.get(priority.ordinal());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700203 }
204 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800205
Ray Milkey8e5170e2014-04-02 12:09:55 -0700206 /**
207 * BarrierInfo holds information to specify barrier message sent to switch.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700208 */
Ray Milkey1584ec82014-04-10 11:58:30 -0700209 private static final class BarrierInfo {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700210 final long dpid;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700211 final long xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800212
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700213 static BarrierInfo create(long dpid, OFBarrierRequest req) {
214 return new BarrierInfo(dpid, req.getXid());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700215 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800216
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700217 static BarrierInfo create(long dpid, OFBarrierReply rpy) {
218 return new BarrierInfo(dpid, rpy.getXid());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700219 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800220
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700221 private BarrierInfo(long dpid, long xid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700222 this.dpid = dpid;
223 this.xid = xid;
224 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800225
Ray Milkey8e5170e2014-04-02 12:09:55 -0700226 // Auto generated code by Eclipse
227 @Override
228 public int hashCode() {
229 final int prime = 31;
230 int result = 1;
231 result = prime * result + (int) (dpid ^ (dpid >>> 32));
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700232 result = prime * result + (int) (xid ^ (xid >>> 32));
Ray Milkey8e5170e2014-04-02 12:09:55 -0700233 return result;
234 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800235
Ray Milkey8e5170e2014-04-02 12:09:55 -0700236 @Override
237 public boolean equals(Object obj) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700238 if (this == obj) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700239 return true;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700240 }
241 if (obj == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700242 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700243 }
244 if (getClass() != obj.getClass()) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700245 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700246 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800247
Ray Milkey8e5170e2014-04-02 12:09:55 -0700248 BarrierInfo other = (BarrierInfo) obj;
249 return (this.dpid == other.dpid) && (this.xid == other.xid);
250 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800251
Ray Milkey8e5170e2014-04-02 12:09:55 -0700252 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800253
Ray Milkey8e5170e2014-04-02 12:09:55 -0700254 /**
255 * Main thread that reads messages from queues and sends them to switches.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700256 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700257 private class FlowPusherThread extends Thread {
258 private Map<Dpid, SwitchQueue> assignedQueues = new ConcurrentHashMap<>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800259
Ray Milkey8e5170e2014-04-02 12:09:55 -0700260 final Lock queuingLock = new ReentrantLock();
261 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800262
Ray Milkey8e5170e2014-04-02 12:09:55 -0700263 @Override
264 public void run() {
265 this.setName("FlowPusherThread " + this.getId());
266 while (true) {
267 while (!queuesHasMessageToSend()) {
268 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800269
Ray Milkey8e5170e2014-04-02 12:09:55 -0700270 try {
271 // wait for message pushed to queue
272 messagePushed.await();
273 } catch (InterruptedException e) {
274 // Interrupted to be shut down (not an error)
275 log.debug("FlowPusherThread is interrupted");
276 return;
277 } finally {
278 queuingLock.unlock();
279 }
280 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800281
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700282 for (Iterator<Entry<Dpid, SwitchQueue>> it = assignedQueues
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700283 .entrySet().iterator(); it.hasNext();) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700284 Entry<Dpid, SwitchQueue> entry = it.next();
Naoki Shiota47993102014-07-09 14:00:54 -0700285 SwitchQueue queue = entry.getValue();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800286
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700287 if (queue == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700288 continue;
289 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800290
Ray Milkey8e5170e2014-04-02 12:09:55 -0700291 synchronized (queue) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700292 processQueue(entry.getKey(), queue, MAX_MESSAGE_SEND);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700293 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
294 // remove queue if flagged to be.
Naoki Shiota47993102014-07-09 14:00:54 -0700295 it.remove();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700296 }
297 }
298 }
299 }
300 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800301
Ray Milkey8e5170e2014-04-02 12:09:55 -0700302 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700303 * Read messages from queue and send them to the switch. If number of
304 * messages excess the limit, stop sending messages.
305 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700306 * @param dpid DPID of the switch to which messages will be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700307 * @param queue Queue of messages.
308 * @param maxMsg Limitation of number of messages to be sent. If set to
309 * 0, all messages in queue will be sent.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700310 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700311 private void processQueue(Dpid dpid, SwitchQueue queue, int maxMsg) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700312 // check sending rate and determine it to be sent or not
Ray Milkey2476cac2014-04-08 11:03:21 -0700313 long currentTime = System.currentTimeMillis();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700314 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800315
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700316 IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
317 if (sw == null) {
318 // FlowPusher state for this switch will get cleaned up soon
319 // due to the switchDisconnected event
320 log.debug("Switch {} not found when processing queue", dpid);
321 return;
322 }
323
Naoki Shiota47993102014-07-09 14:00:54 -0700324 if (sw.isConnected() && queue.isSendable(currentTime)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700325 int i = 0;
326 while (queue.hasMessageToSend()) {
327 // Number of messages excess the limit
Ray Milkey9526d6f2014-04-10 14:54:15 -0700328 if (0 < maxMsg && maxMsg <= i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700329 break;
330 }
331 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800332
Ray Milkey8e5170e2014-04-02 12:09:55 -0700333 SwitchQueueEntry queueEntry;
334 synchronized (queue) {
335 queueEntry = queue.poll();
336 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800337
Ray Milkey8e5170e2014-04-02 12:09:55 -0700338 OFMessage msg = queueEntry.getOFMessage();
339 try {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700340 sw.write(msg, null);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700341 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700342 log.trace("Pusher sends message to switch {}: {}", sw.getStringId(), msg);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700343 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700344 // TODO BOC how do we get the size?
345 // size += msg.getLength();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700346 } catch (IOException e) {
Yuta HIGUCHIe933a282014-07-13 21:02:29 -0700347 log.error("Exception in sending message (" + msg + "):", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700348 }
349 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800350
Ray Milkey8e5170e2014-04-02 12:09:55 -0700351 sw.flush();
Ray Milkey2476cac2014-04-08 11:03:21 -0700352 queue.logSentData(currentTime, size);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700353 }
354 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800355
Ray Milkey8e5170e2014-04-02 12:09:55 -0700356 private boolean queuesHasMessageToSend() {
357 for (SwitchQueue queue : assignedQueues.values()) {
358 if (queue.hasMessageToSend()) {
359 return true;
360 }
361 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700362
Ray Milkey8e5170e2014-04-02 12:09:55 -0700363 return false;
364 }
Naoki Shiota05334692014-03-18 16:06:36 -0700365
Ray Milkey8e5170e2014-04-02 12:09:55 -0700366 private void notifyMessagePushed() {
367 queuingLock.lock();
368 try {
369 messagePushed.signal();
370 } finally {
371 queuingLock.unlock();
372 }
373 }
374 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700375
Ray Milkey8e5170e2014-04-02 12:09:55 -0700376 /**
377 * Initialize object with one thread.
378 */
379 public FlowPusher() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700380 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700381 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800382
Ray Milkey8e5170e2014-04-02 12:09:55 -0700383 /**
384 * Initialize object with threads of given number.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700385 * <p>
Ray Milkey9526d6f2014-04-10 14:54:15 -0700386 * @param numberThreadValue Number of threads to handle messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700387 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700388 public FlowPusher(int numberThreadValue) {
389 if (numberThreadValue > 0) {
390 numberThread = numberThreadValue;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700391 } else {
Ray Milkey9526d6f2014-04-10 14:54:15 -0700392 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700393 }
394 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800395
Ray Milkey8e5170e2014-04-02 12:09:55 -0700396 /**
397 * Set parameters needed for sending messages.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700398 * <p>
399 * @param floodlightContext FloodlightModuleContext used for acquiring
400 * ThreadPoolService and registering MessageListener.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700401 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700402 public void init(FloodlightModuleContext floodlightContext) {
403 this.context = floodlightContext;
404 this.floodlightProvider = context
405 .getServiceImpl(IFloodlightProviderService.class);
406 this.threadPool = context.getServiceImpl(IThreadPoolService.class);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800407
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700408 floodlightProvider.addOFMessageListener(OFType.BARRIER_REPLY, this);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700409 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800410
Ray Milkey8e5170e2014-04-02 12:09:55 -0700411 /**
412 * Begin processing queue.
413 */
414 public void start() {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -0700415 threadMap = new HashMap<>();
Ray Milkey2476cac2014-04-08 11:03:21 -0700416 for (long i = 0; i < numberThread; ++i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700417 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800418
Ray Milkey8e5170e2014-04-02 12:09:55 -0700419 threadMap.put(i, thread);
420 thread.start();
421 }
422 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800423
Ray Milkey8e5170e2014-04-02 12:09:55 -0700424 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700425 public boolean suspend(Dpid dpid) {
426 SwitchQueue queue = getQueue(dpid);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700427
Ray Milkey8e5170e2014-04-02 12:09:55 -0700428 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700429 // create queue in case suspend is called before first message
430 // addition
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700431 queue = createQueueImpl(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700432 }
433
434 synchronized (queue) {
435 if (queue.state == QueueState.READY) {
436 queue.state = QueueState.SUSPENDED;
437 return true;
438 }
439 return false;
440 }
441 }
442
443 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700444 public boolean resume(Dpid dpid) {
445 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700446
447 if (queue == null) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700448 log.error("No queue is attached to DPID: {}", dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700449 return false;
450 }
451
452 synchronized (queue) {
453 if (queue.state == QueueState.SUSPENDED) {
454 queue.state = QueueState.READY;
455
456 // Free the latch if queue has any messages
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700457 FlowPusherThread thread = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700458 if (queue.hasMessageToSend()) {
459 thread.notifyMessagePushed();
460 }
461 return true;
462 }
463 return false;
464 }
465 }
466
467 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700468 public QueueState getState(Dpid dpid) {
469 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700470
471 if (queue == null) {
472 return QueueState.UNKNOWN;
473 }
474
475 return queue.state;
476 }
477
478 /**
479 * Stop processing queue and exit thread.
480 */
481 public void stop() {
482 if (threadMap == null) {
483 return;
484 }
485
486 for (FlowPusherThread t : threadMap.values()) {
487 t.interrupt();
488 }
489 }
490
491 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700492 public void setRate(Dpid dpid, long rate) {
493 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700494 if (queue == null) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700495 queue = createQueueImpl(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700496 }
497
498 if (rate > 0) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700499 log.debug("rate for {} is set to {}", dpid, rate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700500 synchronized (queue) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700501 queue.maxRate = rate;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700502 }
503 }
504 }
505
506 @Override
Pavlin Radoslavov9fc535a2014-04-11 13:00:12 -0700507 @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700508 justification = "Future versions of createQueueImpl() might return null")
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700509 public boolean createQueue(Dpid dpid) {
510 SwitchQueue queue = createQueueImpl(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700511
512 return (queue != null);
513 }
514
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700515 protected SwitchQueue createQueueImpl(Dpid dpid) {
516 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700517 if (queue != null) {
518 return queue;
519 }
520
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700521 FlowPusherThread proc = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700522 queue = new SwitchQueue();
523 queue.state = QueueState.READY;
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700524 proc.assignedQueues.put(dpid, queue);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700525
526 return queue;
527 }
528
529 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700530 public boolean deleteQueue(Dpid dpid) {
531 return deleteQueue(dpid, false);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700532 }
533
534 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700535 public boolean deleteQueue(Dpid dpid, boolean forceStop) {
536 FlowPusherThread proc = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700537
538 if (forceStop) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700539 SwitchQueue queue = proc.assignedQueues.remove(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700540 if (queue == null) {
541 return false;
542 }
543 return true;
544 } else {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700545 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700546 if (queue == null) {
547 return false;
548 }
549 synchronized (queue) {
550 queue.toBeDeleted = true;
551 }
552 return true;
553 }
554 }
555
556 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700557 public boolean add(Dpid dpid, OFMessage msg) {
558 return add(dpid, msg, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700559 }
560
561 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700562 public boolean add(Dpid dpid, OFMessage msg, MsgPriority priority) {
563 return addMessageImpl(dpid, msg, priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700564 }
565
566 @Override
567 public void pushFlowEntries(
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700568 Collection<Pair<Dpid, FlowEntry>> entries) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700569 pushFlowEntries(entries, MsgPriority.NORMAL);
570 }
571
572 @Override
573 public void pushFlowEntries(
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700574 Collection<Pair<Dpid, FlowEntry>> entries, MsgPriority priority) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700575
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700576 for (Pair<Dpid, FlowEntry> entry : entries) {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -0700577 add(entry.getLeft(), entry.getRight(), priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700578 }
579 }
580
581 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700582 public void pushFlowEntry(Dpid dpid, FlowEntry flowEntry) {
583 pushFlowEntry(dpid, flowEntry, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700584 }
585
586 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700587 public void pushFlowEntry(Dpid dpid, FlowEntry flowEntry, MsgPriority priority) {
588 Collection<Pair<Dpid, FlowEntry>> entries = new LinkedList<>();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700589
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700590 entries.add(Pair.of(dpid, flowEntry));
Ray Milkey8e5170e2014-04-02 12:09:55 -0700591 pushFlowEntries(entries, priority);
592 }
593
594 /**
Ray Milkeyddcd4922014-04-17 11:21:20 -0700595 * Create a message from FlowEntry and add it to the queue of the switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700596 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700597 * @param dpid DPID of the switch to which the message is pushed.
Ray Milkeyddcd4922014-04-17 11:21:20 -0700598 * @param flowEntry FlowEntry object used for creating message.
599 * @return true if message is successfully added to a queue.
600 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700601 private boolean add(Dpid dpid, FlowEntry flowEntry, MsgPriority priority) {
602 IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
603 if (sw == null) {
604 log.warn("Couldn't find switch {} when pushing message", dpid);
605 return false;
606 }
607
Ray Milkeyddcd4922014-04-17 11:21:20 -0700608 //
609 // Create the OpenFlow Flow Modification Entry to push
610 //
Jonathan Harta213bce2014-08-11 15:44:07 -0700611 OFFlowMod fm = flowEntry.buildFlowMod(sw.getFactory());
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700612 // log.trace("Pushing flow mod {}", fm);
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700613 return addMessageImpl(dpid, fm, priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700614 }
615
616 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700617 * Add message to queue.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700618 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700619 * @param dpid DPID of the switch to which the message is sent
620 * @param msg message to send to the switch
621 * @param priority priority of the message
Jonathan Hart99ff20a2014-06-15 16:53:00 -0700622 * @return true if the message was added successfully, otherwise false
Ray Milkey8e5170e2014-04-02 12:09:55 -0700623 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700624 protected boolean addMessageImpl(Dpid dpid, OFMessage msg, MsgPriority priority) {
625 FlowPusherThread thread = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700626
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700627 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700628
629 // create queue at first addition of message
630 if (queue == null) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700631 queue = createQueueImpl(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700632 }
633
634 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
635
636 synchronized (queue) {
637 queue.add(entry, priority);
638 if (log.isTraceEnabled()) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700639 log.trace("Message is pushed to switch {}: {}",
640 dpid, entry.getOFMessage());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700641 }
642 }
643
644 thread.notifyMessagePushed();
645
646 return true;
647 }
648
649 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700650 public OFBarrierReply barrier(Dpid dpid) {
651 OFMessageFuture<OFBarrierReply> future = barrierAsync(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700652 if (future == null) {
653 return null;
654 }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700655 try {
656 return future.get();
657 } catch (InterruptedException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700658 log.error("InterruptedException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700659 } catch (ExecutionException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700660 log.error("ExecutionException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700661 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700662 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700663 }
664
665 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700666 public OFMessageFuture<OFBarrierReply> barrierAsync(Dpid dpid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700667 // TODO creation of message and future should be moved to OFSwitchImpl
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700668 IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700669 if (sw == null) {
670 return null;
671 }
672
673 OFBarrierRequest msg = createBarrierRequest(sw);
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700674 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
675 (int) msg.getXid());
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700676 barrierFutures.put(BarrierInfo.create(dpid.value(), msg), future);
677 addMessageImpl(dpid, msg, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700678 return future;
679 }
680
681 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
Jonathan Harta213bce2014-08-11 15:44:07 -0700682 OFFactory factory = sw.getFactory();
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700683 if (factory == null) {
684 log.error("No OF Message Factory for switch {} with OFVersion {}", sw,
685 sw.getOFVersion());
686 return null;
687 }
688 return factory.buildBarrierRequest()
689 .setXid(sw.getNextTransactionId())
690 .build();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700691 }
692
693 /**
694 * Get a queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700695 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700696 * @param dpid DPID of the switch
Ray Milkey8e5170e2014-04-02 12:09:55 -0700697 * @return Queue object
698 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700699 protected SwitchQueue getQueue(Dpid dpid) {
700 if (dpid == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700701 return null;
702 }
703
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700704 FlowPusherThread th = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700705 if (th == null) {
706 return null;
707 }
708
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700709 return th.assignedQueues.get(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700710 }
711
712 /**
713 * Get a hash value correspondent to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700714 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700715 * @param dpid DPID of the switch
Ray Milkey8e5170e2014-04-02 12:09:55 -0700716 * @return Hash value
717 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700718 protected long getHash(long dpid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700719 // This code assumes DPID is sequentially assigned.
720 // TODO consider equalization algorithm
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700721 return dpid % numberThread;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700722 }
723
724 /**
725 * Get a Thread object which processes the queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700726 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700727 * @param dpid DPID of the switch
Ray Milkey8e5170e2014-04-02 12:09:55 -0700728 * @return Thread object
729 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700730 protected FlowPusherThread getProcessingThread(Dpid dpid) {
731 long hash = getHash(dpid.value());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700732
733 return threadMap.get(hash);
734 }
735
736 @Override
737 public String getName() {
738 return "flowpusher";
739 }
740
741 @Override
742 public boolean isCallbackOrderingPrereq(OFType type, String name) {
743 return false;
744 }
745
746 @Override
747 public boolean isCallbackOrderingPostreq(OFType type, String name) {
748 return false;
749 }
750
751 @Override
752 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
753 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700754 log.trace("Received BARRIER_REPLY from : {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700755 }
756
Pavlin Radoslavov0b88a262014-04-10 15:43:27 -0700757 if ((msg.getType() != OFType.BARRIER_REPLY) ||
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700758 !(msg instanceof OFBarrierReply)) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700759 log.error("Unexpected reply message: {}", msg.getType());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700760 return Command.CONTINUE;
761 }
762
763 OFBarrierReply reply = (OFBarrierReply) msg;
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700764 BarrierInfo info = BarrierInfo.create(sw.getId(), reply);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700765 // Deliver future if exists
766 OFBarrierReplyFuture future = barrierFutures.get(info);
767 if (future != null) {
768 future.deliverFuture(sw, msg);
769 barrierFutures.remove(info);
770 }
771
772 return Command.CONTINUE;
773 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700774
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700775}