blob: 419dc439da9f240722afea197f6013805a28c95f [file] [log] [blame]
Jonathan Hart23701d12014-04-03 10:45:48 -07001package net.onrc.onos.core.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota47993102014-07-09 14:00:54 -07008import java.util.Iterator;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08009import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080010import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070011import java.util.Map;
Naoki Shiota47993102014-07-09 14:00:54 -070012import java.util.Map.Entry;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070014import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080015import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070016import java.util.concurrent.locks.Condition;
17import java.util.concurrent.locks.Lock;
18import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070027import net.onrc.onos.core.intent.FlowEntry;
Jonathan Hart23701d12014-04-03 10:45:48 -070028import net.onrc.onos.core.util.Pair;
Jonathan Harta99ec672014-04-03 11:30:34 -070029
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070030import org.projectfloodlight.openflow.protocol.OFBarrierReply;
31import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
32import org.projectfloodlight.openflow.protocol.OFFactory;
33import org.projectfloodlight.openflow.protocol.OFFlowMod;
34import org.projectfloodlight.openflow.protocol.OFMessage;
35import org.projectfloodlight.openflow.protocol.OFType;
36import org.projectfloodlight.openflow.protocol.OFVersion;
Jonathan Harta99ec672014-04-03 11:30:34 -070037import org.slf4j.Logger;
38import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070039
Yuta HIGUCHIafadeda2014-07-24 17:11:07 -070040import com.google.common.cache.CacheBuilder;
41
Jonathan Hartc00f5c22014-06-10 15:14:40 -070042import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
43
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070044/**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070045 * FlowPusher is a implementation of FlowPusherService. FlowPusher assigns one
46 * message queue instance for each one switch. Number of message processing
47 * threads is configurable by constructor, and one thread can handle multiple
48 * message queues. Each queue will be assigned to a thread according to hash
49 * function defined by getHash(). Each processing thread reads messages from
50 * queues and sends it to switches in round-robin. Processing thread also
51 * calculates rate of sending to suppress excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070052 */
Ray Milkey1584ec82014-04-10 11:58:30 -070053public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
Ray Milkeyec838942014-04-09 11:28:43 -070054 private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070055 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080056
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080057 // Number of messages sent to switch at once
58 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080059
Ray Milkey8e5170e2014-04-02 12:09:55 -070060 private static class SwitchQueueEntry {
61 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070062
Ray Milkey8e5170e2014-04-02 12:09:55 -070063 public SwitchQueueEntry(OFMessage msg) {
64 this.msg = msg;
65 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070066
Ray Milkey8e5170e2014-04-02 12:09:55 -070067 public OFMessage getOFMessage() {
68 return msg;
69 }
70 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -070071
Ray Milkey8e5170e2014-04-02 12:09:55 -070072 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070073 * SwitchQueue represents message queue attached to a switch. This consists
74 * of queue itself and variables used for limiting sending rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -070075 */
Pavlin Radoslavovfee80982014-04-10 12:12:04 -070076 private static class SwitchQueue {
Ray Milkey2476cac2014-04-08 11:03:21 -070077 List<Queue<SwitchQueueEntry>> rawQueues;
Ray Milkey8e5170e2014-04-02 12:09:55 -070078 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080079
Ray Milkey8e5170e2014-04-02 12:09:55 -070080 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070081 long maxRate = 0; // 0 indicates no limitation
Ray Milkey2476cac2014-04-08 11:03:21 -070082 long lastSentTime = 0;
83 long lastSentSize = 0;
Naoki Shiota05334692014-03-18 16:06:36 -070084
Ray Milkey8e5170e2014-04-02 12:09:55 -070085 // "To be deleted" flag
86 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080087
Ray Milkey8e5170e2014-04-02 12:09:55 -070088 SwitchQueue() {
Ray Milkey2476cac2014-04-08 11:03:21 -070089 rawQueues = new ArrayList<Queue<SwitchQueueEntry>>(
Ray Milkey8e5170e2014-04-02 12:09:55 -070090 MsgPriority.values().length);
91 for (int i = 0; i < MsgPriority.values().length; ++i) {
Ray Milkey2476cac2014-04-08 11:03:21 -070092 rawQueues.add(i, new ArrayDeque<SwitchQueueEntry>());
Ray Milkey8e5170e2014-04-02 12:09:55 -070093 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -080094
Ray Milkey8e5170e2014-04-02 12:09:55 -070095 state = QueueState.READY;
96 }
Naoki Shiotae3199732013-11-25 16:14:43 -080097
Ray Milkey8e5170e2014-04-02 12:09:55 -070098 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -070099 * Check if sending rate is within the rate.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700100 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700101 * @param current Current time
102 * @return true if within the rate
103 */
104 boolean isSendable(long current) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700105 if (maxRate == 0) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700106 // no limitation
107 return true;
108 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800109
Ray Milkey2476cac2014-04-08 11:03:21 -0700110 if (current == lastSentTime) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700111 return false;
112 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800113
Ray Milkey8e5170e2014-04-02 12:09:55 -0700114 // Check if sufficient time (from aspect of rate) elapsed or not.
Ray Milkey2476cac2014-04-08 11:03:21 -0700115 long rate = lastSentSize / (current - lastSentTime);
116 return (rate < maxRate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700117 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800118
Ray Milkey8e5170e2014-04-02 12:09:55 -0700119 /**
120 * Log time and size of last sent data.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700121 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700122 * @param current Time to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700123 * @param size Size of sent data (in bytes).
Ray Milkey8e5170e2014-04-02 12:09:55 -0700124 */
125 void logSentData(long current, long size) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700126 lastSentTime = current;
127 lastSentSize = size;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700128 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700129
Ray Milkey8e5170e2014-04-02 12:09:55 -0700130 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
131 Queue<SwitchQueueEntry> queue = getQueue(priority);
132 if (queue == null) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700133 log.error("Unexpected priority: {}", priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700134 return false;
135 }
136 return queue.add(entry);
137 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800138
Ray Milkey8e5170e2014-04-02 12:09:55 -0700139 /**
140 * Poll single appropriate entry object according to QueueState.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700141 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700142 * @return Entry object.
143 */
144 SwitchQueueEntry poll() {
145 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700146 case READY: {
147 for (int i = 0; i < rawQueues.size(); ++i) {
148 SwitchQueueEntry entry = rawQueues.get(i).poll();
149 if (entry != null) {
150 return entry;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700151 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700152 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800153
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700154 return null;
155 }
156 case SUSPENDED: {
157 // Only polling from high priority queue
158 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
159 return entry;
160 }
161 default:
162 log.error("Unexpected QueueState: {}", state);
163 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700164 }
165 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800166
Ray Milkey8e5170e2014-04-02 12:09:55 -0700167 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700168 * Check if this object has any messages in the queues to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700169 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700170 * @return True if there are some messages to be sent.
171 */
172 boolean hasMessageToSend() {
173 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700174 case READY:
175 for (Queue<SwitchQueueEntry> queue : rawQueues) {
176 if (!queue.isEmpty()) {
177 return true;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700178 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700179 }
180 break;
181 case SUSPENDED:
182 // Only checking high priority queue
183 return (!getQueue(MsgPriority.HIGH).isEmpty());
184 default:
185 log.error("Unexpected QueueState: {}", state);
186 return false;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700187 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800188
Ray Milkey8e5170e2014-04-02 12:09:55 -0700189 return false;
190 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800191
Ray Milkey8e5170e2014-04-02 12:09:55 -0700192 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700193 return rawQueues.get(priority.ordinal());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700194 }
195 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800196
Ray Milkey8e5170e2014-04-02 12:09:55 -0700197 /**
198 * BarrierInfo holds information to specify barrier message sent to switch.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700199 */
Ray Milkey1584ec82014-04-10 11:58:30 -0700200 private static final class BarrierInfo {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700201 final long dpid;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700202 final long xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800203
Ray Milkey8e5170e2014-04-02 12:09:55 -0700204 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
205 return new BarrierInfo(sw.getId(), req.getXid());
206 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800207
Ray Milkey8e5170e2014-04-02 12:09:55 -0700208 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
209 return new BarrierInfo(sw.getId(), rpy.getXid());
210 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800211
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700212 private BarrierInfo(long dpid, long xid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700213 this.dpid = dpid;
214 this.xid = xid;
215 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800216
Ray Milkey8e5170e2014-04-02 12:09:55 -0700217 // Auto generated code by Eclipse
218 @Override
219 public int hashCode() {
220 final int prime = 31;
221 int result = 1;
222 result = prime * result + (int) (dpid ^ (dpid >>> 32));
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700223 result = prime * result + (int) (xid ^ (xid >>> 32));
Ray Milkey8e5170e2014-04-02 12:09:55 -0700224 return result;
225 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800226
Ray Milkey8e5170e2014-04-02 12:09:55 -0700227 @Override
228 public boolean equals(Object obj) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700229 if (this == obj) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700230 return true;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700231 }
232 if (obj == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700233 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700234 }
235 if (getClass() != obj.getClass()) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700236 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700237 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800238
Ray Milkey8e5170e2014-04-02 12:09:55 -0700239 BarrierInfo other = (BarrierInfo) obj;
240 return (this.dpid == other.dpid) && (this.xid == other.xid);
241 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800242
Ray Milkey8e5170e2014-04-02 12:09:55 -0700243 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800244
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700245 private FloodlightModuleContext context = null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700246 private IThreadPoolService threadPool = null;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700247 private IFloodlightProviderService floodlightProvider = null;
248 protected Map<OFVersion, OFFactory> ofFactoryMap = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800249
Ray Milkey8e5170e2014-04-02 12:09:55 -0700250 // Map of threads versus dpid
251 private Map<Long, FlowPusherThread> threadMap = null;
252 // Map from (DPID and transaction ID) to Future objects.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700253 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
254 new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800255
Ray Milkey2476cac2014-04-08 11:03:21 -0700256 private int numberThread;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800257
Ray Milkey8e5170e2014-04-02 12:09:55 -0700258 /**
259 * Main thread that reads messages from queues and sends them to switches.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700260 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700261 private static class FlowPusherThread extends Thread {
262 // Weak ConcurrentHashMap
263 private Map<IOFSwitch, SwitchQueue> assignedQueues = CacheBuilder.newBuilder()
264 .weakKeys()
265 .<IOFSwitch, SwitchQueue>build().asMap();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800266
Ray Milkey8e5170e2014-04-02 12:09:55 -0700267 final Lock queuingLock = new ReentrantLock();
268 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800269
Ray Milkey8e5170e2014-04-02 12:09:55 -0700270 @Override
271 public void run() {
272 this.setName("FlowPusherThread " + this.getId());
273 while (true) {
274 while (!queuesHasMessageToSend()) {
275 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800276
Ray Milkey8e5170e2014-04-02 12:09:55 -0700277 try {
278 // wait for message pushed to queue
279 messagePushed.await();
280 } catch (InterruptedException e) {
281 // Interrupted to be shut down (not an error)
282 log.debug("FlowPusherThread is interrupted");
283 return;
284 } finally {
285 queuingLock.unlock();
286 }
287 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800288
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700289 for (Iterator<Entry<IOFSwitch, SwitchQueue>> it = assignedQueues
290 .entrySet().iterator(); it.hasNext();) {
Naoki Shiota47993102014-07-09 14:00:54 -0700291 Entry<IOFSwitch, SwitchQueue> entry = it.next();
292 IOFSwitch sw = entry.getKey();
293 SwitchQueue queue = entry.getValue();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800294
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700295 if (queue == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700296 continue;
297 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800298
Ray Milkey8e5170e2014-04-02 12:09:55 -0700299 synchronized (queue) {
300 processQueue(sw, queue, MAX_MESSAGE_SEND);
301 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
302 // remove queue if flagged to be.
Naoki Shiota47993102014-07-09 14:00:54 -0700303 it.remove();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700304 }
305 }
306 }
307 }
308 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800309
Ray Milkey8e5170e2014-04-02 12:09:55 -0700310 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700311 * Read messages from queue and send them to the switch. If number of
312 * messages excess the limit, stop sending messages.
313 * <p>
314 * @param sw Switch to which messages will be sent.
315 * @param queue Queue of messages.
316 * @param maxMsg Limitation of number of messages to be sent. If set to
317 * 0, all messages in queue will be sent.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700318 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700319 private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700320 // check sending rate and determine it to be sent or not
Ray Milkey2476cac2014-04-08 11:03:21 -0700321 long currentTime = System.currentTimeMillis();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700322 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800323
Naoki Shiota47993102014-07-09 14:00:54 -0700324 if (sw.isConnected() && queue.isSendable(currentTime)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700325 int i = 0;
326 while (queue.hasMessageToSend()) {
327 // Number of messages excess the limit
Ray Milkey9526d6f2014-04-10 14:54:15 -0700328 if (0 < maxMsg && maxMsg <= i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700329 break;
330 }
331 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800332
Ray Milkey8e5170e2014-04-02 12:09:55 -0700333 SwitchQueueEntry queueEntry;
334 synchronized (queue) {
335 queueEntry = queue.poll();
336 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800337
Ray Milkey8e5170e2014-04-02 12:09:55 -0700338 OFMessage msg = queueEntry.getOFMessage();
339 try {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700340 sw.write(msg, null);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700341 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700342 log.trace("Pusher sends message to switch {}: {}", sw.getStringId(), msg);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700343 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700344 // TODO BOC how do we get the size?
345 // size += msg.getLength();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700346 } catch (IOException e) {
Yuta HIGUCHIe933a282014-07-13 21:02:29 -0700347 log.error("Exception in sending message (" + msg + "):", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700348 }
349 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800350
Ray Milkey8e5170e2014-04-02 12:09:55 -0700351 sw.flush();
Ray Milkey2476cac2014-04-08 11:03:21 -0700352 queue.logSentData(currentTime, size);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700353 }
354 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800355
Ray Milkey8e5170e2014-04-02 12:09:55 -0700356 private boolean queuesHasMessageToSend() {
357 for (SwitchQueue queue : assignedQueues.values()) {
358 if (queue.hasMessageToSend()) {
359 return true;
360 }
361 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700362
Ray Milkey8e5170e2014-04-02 12:09:55 -0700363 return false;
364 }
Naoki Shiota05334692014-03-18 16:06:36 -0700365
Ray Milkey8e5170e2014-04-02 12:09:55 -0700366 private void notifyMessagePushed() {
367 queuingLock.lock();
368 try {
369 messagePushed.signal();
370 } finally {
371 queuingLock.unlock();
372 }
373 }
374 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700375
Ray Milkey8e5170e2014-04-02 12:09:55 -0700376 /**
377 * Initialize object with one thread.
378 */
379 public FlowPusher() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700380 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700381 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800382
Ray Milkey8e5170e2014-04-02 12:09:55 -0700383 /**
384 * Initialize object with threads of given number.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700385 * <p>
Ray Milkey9526d6f2014-04-10 14:54:15 -0700386 * @param numberThreadValue Number of threads to handle messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700387 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700388 public FlowPusher(int numberThreadValue) {
389 if (numberThreadValue > 0) {
390 numberThread = numberThreadValue;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700391 } else {
Ray Milkey9526d6f2014-04-10 14:54:15 -0700392 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700393 }
394 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800395
Ray Milkey8e5170e2014-04-02 12:09:55 -0700396 /**
397 * Set parameters needed for sending messages.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700398 * <p>
399 * @param floodlightContext FloodlightModuleContext used for acquiring
400 * ThreadPoolService and registering MessageListener.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700401 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700402 public void init(FloodlightModuleContext floodlightContext) {
403 this.context = floodlightContext;
404 this.floodlightProvider = context
405 .getServiceImpl(IFloodlightProviderService.class);
406 this.threadPool = context.getServiceImpl(IThreadPoolService.class);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800407
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700408 ofFactoryMap = new HashMap<>();
409 ofFactoryMap.put(OFVersion.OF_10, floodlightProvider.getOFMessageFactory_10());
410 ofFactoryMap.put(OFVersion.OF_13, floodlightProvider.getOFMessageFactory_13());
411 floodlightProvider.addOFMessageListener(OFType.BARRIER_REPLY, this);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700412 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800413
Ray Milkey8e5170e2014-04-02 12:09:55 -0700414 /**
415 * Begin processing queue.
416 */
417 public void start() {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700418 // TODO BOC
419 // if (factory == null) {
420 // log.error("FlowPusher not yet initialized.");
421 // return;
422 // }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800423
Ray Milkey8e5170e2014-04-02 12:09:55 -0700424 threadMap = new HashMap<Long, FlowPusherThread>();
Ray Milkey2476cac2014-04-08 11:03:21 -0700425 for (long i = 0; i < numberThread; ++i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700426 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800427
Ray Milkey8e5170e2014-04-02 12:09:55 -0700428 threadMap.put(i, thread);
429 thread.start();
430 }
431 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800432
Ray Milkey8e5170e2014-04-02 12:09:55 -0700433 @Override
434 public boolean suspend(IOFSwitch sw) {
435 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700436
Ray Milkey8e5170e2014-04-02 12:09:55 -0700437 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700438 // create queue in case suspend is called before first message
439 // addition
Ray Milkey8e5170e2014-04-02 12:09:55 -0700440 queue = createQueueImpl(sw);
441 }
442
443 synchronized (queue) {
444 if (queue.state == QueueState.READY) {
445 queue.state = QueueState.SUSPENDED;
446 return true;
447 }
448 return false;
449 }
450 }
451
452 @Override
453 public boolean resume(IOFSwitch sw) {
454 SwitchQueue queue = getQueue(sw);
455
456 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700457 log.error("No queue is attached to DPID: {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700458 return false;
459 }
460
461 synchronized (queue) {
462 if (queue.state == QueueState.SUSPENDED) {
463 queue.state = QueueState.READY;
464
465 // Free the latch if queue has any messages
466 FlowPusherThread thread = getProcessingThread(sw);
467 if (queue.hasMessageToSend()) {
468 thread.notifyMessagePushed();
469 }
470 return true;
471 }
472 return false;
473 }
474 }
475
476 @Override
477 public QueueState getState(IOFSwitch sw) {
478 SwitchQueue queue = getQueue(sw);
479
480 if (queue == null) {
481 return QueueState.UNKNOWN;
482 }
483
484 return queue.state;
485 }
486
487 /**
488 * Stop processing queue and exit thread.
489 */
490 public void stop() {
491 if (threadMap == null) {
492 return;
493 }
494
495 for (FlowPusherThread t : threadMap.values()) {
496 t.interrupt();
497 }
498 }
499
500 @Override
501 public void setRate(IOFSwitch sw, long rate) {
502 SwitchQueue queue = getQueue(sw);
503 if (queue == null) {
504 queue = createQueueImpl(sw);
505 }
506
507 if (rate > 0) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700508 log.debug("rate for {} is set to {}", sw.getStringId(), rate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700509 synchronized (queue) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700510 queue.maxRate = rate;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700511 }
512 }
513 }
514
515 @Override
Pavlin Radoslavov9fc535a2014-04-11 13:00:12 -0700516 @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700517 justification = "Future versions of createQueueImpl() might return null")
Ray Milkey8e5170e2014-04-02 12:09:55 -0700518 public boolean createQueue(IOFSwitch sw) {
519 SwitchQueue queue = createQueueImpl(sw);
520
521 return (queue != null);
522 }
523
524 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
525 SwitchQueue queue = getQueue(sw);
526 if (queue != null) {
527 return queue;
528 }
529
530 FlowPusherThread proc = getProcessingThread(sw);
531 queue = new SwitchQueue();
532 queue.state = QueueState.READY;
533 proc.assignedQueues.put(sw, queue);
534
535 return queue;
536 }
537
538 @Override
539 public boolean deleteQueue(IOFSwitch sw) {
540 return deleteQueue(sw, false);
541 }
542
543 @Override
544 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
545 FlowPusherThread proc = getProcessingThread(sw);
546
547 if (forceStop) {
548 SwitchQueue queue = proc.assignedQueues.remove(sw);
549 if (queue == null) {
550 return false;
551 }
552 return true;
553 } else {
554 SwitchQueue queue = getQueue(sw);
555 if (queue == null) {
556 return false;
557 }
558 synchronized (queue) {
559 queue.toBeDeleted = true;
560 }
561 return true;
562 }
563 }
564
565 @Override
566 public boolean add(IOFSwitch sw, OFMessage msg) {
567 return add(sw, msg, MsgPriority.NORMAL);
568 }
569
570 @Override
571 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
572 return addMessageImpl(sw, msg, priority);
573 }
574
575 @Override
576 public void pushFlowEntries(
577 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
578 pushFlowEntries(entries, MsgPriority.NORMAL);
579 }
580
581 @Override
582 public void pushFlowEntries(
583 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
584
585 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Sho SHIMIZU26d77892014-06-10 11:07:06 -0700586 add(entry.getFirst(), entry.getSecond(), priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700587 }
588 }
589
590 @Override
591 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
592 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
593 }
594
595 @Override
596 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
597 Collection<Pair<IOFSwitch, FlowEntry>> entries =
598 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
599
600 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
601 pushFlowEntries(entries, priority);
602 }
603
604 /**
Ray Milkeyddcd4922014-04-17 11:21:20 -0700605 * Create a message from FlowEntry and add it to the queue of the switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700606 * <p>
607 * @param sw Switch to which message is pushed.
Ray Milkeyddcd4922014-04-17 11:21:20 -0700608 * @param flowEntry FlowEntry object used for creating message.
609 * @return true if message is successfully added to a queue.
610 */
611 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
612 //
613 // Create the OpenFlow Flow Modification Entry to push
614 //
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700615 OFFlowMod fm = flowEntry.buildFlowMod(ofFactoryMap.get(sw.getOFVersion()));
616 // log.trace("Pushing flow mod {}", fm);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700617 return addMessageImpl(sw, fm, priority);
618 }
619
620 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700621 * Add message to queue.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700622 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700623 * @param sw
624 * @param msg
Sho SHIMIZUa1199fa2014-06-10 18:11:12 -0700625 * @param priority
Jonathan Hart99ff20a2014-06-15 16:53:00 -0700626 * @return true if the message was added successfully, otherwise false
Ray Milkey8e5170e2014-04-02 12:09:55 -0700627 */
628 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
629 FlowPusherThread thread = getProcessingThread(sw);
630
631 SwitchQueue queue = getQueue(sw);
632
633 // create queue at first addition of message
634 if (queue == null) {
635 queue = createQueueImpl(sw);
636 }
637
638 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
639
640 synchronized (queue) {
641 queue.add(entry, priority);
642 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700643 log.trace("Message is pushed to switch {}: {}", sw.getStringId(), entry.getOFMessage());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700644 }
645 }
646
647 thread.notifyMessagePushed();
648
649 return true;
650 }
651
652 @Override
653 public OFBarrierReply barrier(IOFSwitch sw) {
654 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
655 if (future == null) {
656 return null;
657 }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700658 try {
659 return future.get();
660 } catch (InterruptedException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700661 log.error("InterruptedException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700662 } catch (ExecutionException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700663 log.error("ExecutionException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700664 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700665 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700666 }
667
668 @Override
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700669 public OFMessageFuture<OFBarrierReply> barrierAsync(IOFSwitch sw) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700670 // TODO creation of message and future should be moved to OFSwitchImpl
671
672 if (sw == null) {
673 return null;
674 }
675
676 OFBarrierRequest msg = createBarrierRequest(sw);
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700677 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
678 (int) msg.getXid());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700679 barrierFutures.put(BarrierInfo.create(sw, msg), future);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700680 addMessageImpl(sw, msg, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700681 return future;
682 }
683
684 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700685 OFFactory factory = ofFactoryMap.get(sw.getOFVersion());
686 if (factory == null) {
687 log.error("No OF Message Factory for switch {} with OFVersion {}", sw,
688 sw.getOFVersion());
689 return null;
690 }
691 return factory.buildBarrierRequest()
692 .setXid(sw.getNextTransactionId())
693 .build();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700694 }
695
696 /**
697 * Get a queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700698 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700699 * @param sw Switch object
700 * @return Queue object
701 */
702 protected SwitchQueue getQueue(IOFSwitch sw) {
703 if (sw == null) {
704 return null;
705 }
706
707 FlowPusherThread th = getProcessingThread(sw);
708 if (th == null) {
709 return null;
710 }
711
712 return th.assignedQueues.get(sw);
713 }
714
715 /**
716 * Get a hash value correspondent to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700717 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700718 * @param sw Switch object
719 * @return Hash value
720 */
721 protected long getHash(IOFSwitch sw) {
722 // This code assumes DPID is sequentially assigned.
723 // TODO consider equalization algorithm
Ray Milkey2476cac2014-04-08 11:03:21 -0700724 return sw.getId() % numberThread;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700725 }
726
727 /**
728 * Get a Thread object which processes the queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700729 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700730 * @param sw Switch object
731 * @return Thread object
732 */
733 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
734 long hash = getHash(sw);
735
736 return threadMap.get(hash);
737 }
738
739 @Override
740 public String getName() {
741 return "flowpusher";
742 }
743
744 @Override
745 public boolean isCallbackOrderingPrereq(OFType type, String name) {
746 return false;
747 }
748
749 @Override
750 public boolean isCallbackOrderingPostreq(OFType type, String name) {
751 return false;
752 }
753
754 @Override
755 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
756 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700757 log.trace("Received BARRIER_REPLY from : {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700758 }
759
Pavlin Radoslavov0b88a262014-04-10 15:43:27 -0700760 if ((msg.getType() != OFType.BARRIER_REPLY) ||
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700761 !(msg instanceof OFBarrierReply)) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700762 log.error("Unexpected reply message: {}", msg.getType());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700763 return Command.CONTINUE;
764 }
765
766 OFBarrierReply reply = (OFBarrierReply) msg;
767 BarrierInfo info = BarrierInfo.create(sw, reply);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700768 // Deliver future if exists
769 OFBarrierReplyFuture future = barrierFutures.get(info);
770 if (future != null) {
771 future.deliverFuture(sw, msg);
772 barrierFutures.remove(info);
773 }
774
775 return Command.CONTINUE;
776 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700777
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700778}