blob: 48b2a60fb26e469d331c34db91751638a76ea456 [file] [log] [blame]
Jonathan Hart23701d12014-04-03 10:45:48 -07001package net.onrc.onos.core.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080014import java.util.Set;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070015import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080016import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070017import java.util.concurrent.locks.Condition;
18import java.util.concurrent.locks.Lock;
19import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070021import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080022import net.floodlightcontroller.core.IFloodlightProviderService;
23import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070024import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080025import net.floodlightcontroller.core.internal.OFMessageFuture;
26import net.floodlightcontroller.core.module.FloodlightModuleContext;
27import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080028import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080029import net.floodlightcontroller.util.OFMessageDamper;
Jonathan Hart23701d12014-04-03 10:45:48 -070030import net.onrc.onos.core.util.FlowEntry;
31import net.onrc.onos.core.util.FlowEntryAction;
Jonathan Harta99ec672014-04-03 11:30:34 -070032import net.onrc.onos.core.util.FlowEntryAction.ActionEnqueue;
33import net.onrc.onos.core.util.FlowEntryAction.ActionOutput;
34import net.onrc.onos.core.util.FlowEntryAction.ActionSetEthernetAddr;
35import net.onrc.onos.core.util.FlowEntryAction.ActionSetIPv4Addr;
36import net.onrc.onos.core.util.FlowEntryAction.ActionSetIpToS;
37import net.onrc.onos.core.util.FlowEntryAction.ActionSetTcpUdpPort;
38import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanId;
39import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanPriority;
40import net.onrc.onos.core.util.FlowEntryAction.ActionStripVlan;
Jonathan Hart23701d12014-04-03 10:45:48 -070041import net.onrc.onos.core.util.FlowEntryActions;
42import net.onrc.onos.core.util.FlowEntryMatch;
43import net.onrc.onos.core.util.FlowEntryUserState;
44import net.onrc.onos.core.util.IPv4Net;
45import net.onrc.onos.core.util.Pair;
46import net.onrc.onos.core.util.Port;
Jonathan Harta99ec672014-04-03 11:30:34 -070047
48import org.openflow.protocol.OFBarrierReply;
49import org.openflow.protocol.OFBarrierRequest;
50import org.openflow.protocol.OFFlowMod;
51import org.openflow.protocol.OFMatch;
52import org.openflow.protocol.OFMessage;
53import org.openflow.protocol.OFPacketOut;
54import org.openflow.protocol.OFPort;
55import org.openflow.protocol.OFType;
56import org.openflow.protocol.action.OFAction;
57import org.openflow.protocol.action.OFActionDataLayerDestination;
58import org.openflow.protocol.action.OFActionDataLayerSource;
59import org.openflow.protocol.action.OFActionEnqueue;
60import org.openflow.protocol.action.OFActionNetworkLayerDestination;
61import org.openflow.protocol.action.OFActionNetworkLayerSource;
62import org.openflow.protocol.action.OFActionNetworkTypeOfService;
63import org.openflow.protocol.action.OFActionOutput;
64import org.openflow.protocol.action.OFActionStripVirtualLan;
65import org.openflow.protocol.action.OFActionTransportLayerDestination;
66import org.openflow.protocol.action.OFActionTransportLayerSource;
67import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
68import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
69import org.openflow.protocol.factory.BasicFactory;
70import org.slf4j.Logger;
71import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070072
73/**
Naoki Shiotab485d412013-11-26 12:04:19 -080074 * FlowPusher is a implementation of FlowPusherService.
75 * FlowPusher assigns one message queue instance for each one switch.
76 * Number of message processing threads is configurable by constructor, and
Ray Milkey8e5170e2014-04-02 12:09:55 -070077 * one thread can handle multiple message queues. Each queue will be assigned to
Naoki Shiotab485d412013-11-26 12:04:19 -080078 * a thread according to hash function defined by getHash().
79 * Each processing thread reads messages from queues and sends it to switches
80 * in round-robin. Processing thread also calculates rate of sending to suppress
81 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070082 *
Ray Milkey8e5170e2014-04-02 12:09:55 -070083 * @author Naoki Shiota
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070084 */
Ray Milkey1584ec82014-04-10 11:58:30 -070085public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
Ray Milkeyec838942014-04-09 11:28:43 -070086 private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070087 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080088
Naoki Shiota7d0cf272013-11-05 10:18:12 -080089 // TODO: Values copied from elsewhere (class LearningSwitch).
90 // The local copy should go away!
91 //
92 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
Ray Milkey8e5170e2014-04-02 12:09:55 -070093 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
94
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080095 // Number of messages sent to switch at once
96 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080097
Ray Milkey8e5170e2014-04-02 12:09:55 -070098 private static class SwitchQueueEntry {
99 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700100
Ray Milkey8e5170e2014-04-02 12:09:55 -0700101 public SwitchQueueEntry(OFMessage msg) {
102 this.msg = msg;
103 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700104
Ray Milkey8e5170e2014-04-02 12:09:55 -0700105 public OFMessage getOFMessage() {
106 return msg;
107 }
108 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700109
Ray Milkey8e5170e2014-04-02 12:09:55 -0700110 /**
111 * SwitchQueue represents message queue attached to a switch.
112 * This consists of queue itself and variables used for limiting sending rate.
113 *
114 * @author Naoki Shiota
115 */
Pavlin Radoslavovfee80982014-04-10 12:12:04 -0700116 private static class SwitchQueue {
Ray Milkey2476cac2014-04-08 11:03:21 -0700117 List<Queue<SwitchQueueEntry>> rawQueues;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700118 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800119
Ray Milkey8e5170e2014-04-02 12:09:55 -0700120 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Ray Milkey2476cac2014-04-08 11:03:21 -0700121 long maxRate = 0; // 0 indicates no limitation
122 long lastSentTime = 0;
123 long lastSentSize = 0;
Naoki Shiota05334692014-03-18 16:06:36 -0700124
Ray Milkey8e5170e2014-04-02 12:09:55 -0700125 // "To be deleted" flag
126 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800127
Ray Milkey8e5170e2014-04-02 12:09:55 -0700128 SwitchQueue() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700129 rawQueues = new ArrayList<Queue<SwitchQueueEntry>>(
Ray Milkey8e5170e2014-04-02 12:09:55 -0700130 MsgPriority.values().length);
131 for (int i = 0; i < MsgPriority.values().length; ++i) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700132 rawQueues.add(i, new ArrayDeque<SwitchQueueEntry>());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700133 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800134
Ray Milkey8e5170e2014-04-02 12:09:55 -0700135 state = QueueState.READY;
136 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800137
Ray Milkey8e5170e2014-04-02 12:09:55 -0700138 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700139 * Check if sending rate is within the rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700140 *
141 * @param current Current time
142 * @return true if within the rate
143 */
144 boolean isSendable(long current) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700145 if (maxRate == 0) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700146 // no limitation
147 return true;
148 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800149
Ray Milkey2476cac2014-04-08 11:03:21 -0700150 if (current == lastSentTime) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700151 return false;
152 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800153
Ray Milkey8e5170e2014-04-02 12:09:55 -0700154 // Check if sufficient time (from aspect of rate) elapsed or not.
Ray Milkey2476cac2014-04-08 11:03:21 -0700155 long rate = lastSentSize / (current - lastSentTime);
156 return (rate < maxRate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700157 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800158
Ray Milkey8e5170e2014-04-02 12:09:55 -0700159 /**
160 * Log time and size of last sent data.
161 *
162 * @param current Time to be sent.
163 * @param size Size of sent data (in bytes).
164 */
165 void logSentData(long current, long size) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700166 lastSentTime = current;
167 lastSentSize = size;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700168 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700169
Ray Milkey8e5170e2014-04-02 12:09:55 -0700170 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
171 Queue<SwitchQueueEntry> queue = getQueue(priority);
172 if (queue == null) {
173 log.error("Unexpected priority : ", priority);
174 return false;
175 }
176 return queue.add(entry);
177 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800178
Ray Milkey8e5170e2014-04-02 12:09:55 -0700179 /**
180 * Poll single appropriate entry object according to QueueState.
181 *
182 * @return Entry object.
183 */
184 SwitchQueueEntry poll() {
185 switch (state) {
186 case READY: {
Ray Milkey2476cac2014-04-08 11:03:21 -0700187 for (int i = 0; i < rawQueues.size(); ++i) {
188 SwitchQueueEntry entry = rawQueues.get(i).poll();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700189 if (entry != null) {
190 return entry;
191 }
192 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800193
Ray Milkey8e5170e2014-04-02 12:09:55 -0700194 return null;
195 }
196 case SUSPENDED: {
197 // Only polling from high priority queue
198 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
199 return entry;
200 }
201 default:
202 log.error("Unexpected QueueState : ", state);
203 return null;
204 }
205 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800206
Ray Milkey8e5170e2014-04-02 12:09:55 -0700207 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700208 * Check if this object has any messages in the queues to be sent.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700209 *
210 * @return True if there are some messages to be sent.
211 */
212 boolean hasMessageToSend() {
213 switch (state) {
214 case READY:
Ray Milkey2476cac2014-04-08 11:03:21 -0700215 for (Queue<SwitchQueueEntry> queue : rawQueues) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700216 if (!queue.isEmpty()) {
217 return true;
218 }
219 }
220 break;
221 case SUSPENDED:
222 // Only checking high priority queue
223 return (!getQueue(MsgPriority.HIGH).isEmpty());
224 default:
225 log.error("Unexpected QueueState : ", state);
226 return false;
227 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800228
Ray Milkey8e5170e2014-04-02 12:09:55 -0700229 return false;
230 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800231
Ray Milkey8e5170e2014-04-02 12:09:55 -0700232 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700233 return rawQueues.get(priority.ordinal());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700234 }
235 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800236
Ray Milkey8e5170e2014-04-02 12:09:55 -0700237 /**
238 * BarrierInfo holds information to specify barrier message sent to switch.
239 *
240 * @author Naoki
241 */
Ray Milkey1584ec82014-04-10 11:58:30 -0700242 private static final class BarrierInfo {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700243 final long dpid;
244 final int xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800245
Ray Milkey8e5170e2014-04-02 12:09:55 -0700246 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
247 return new BarrierInfo(sw.getId(), req.getXid());
248 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800249
Ray Milkey8e5170e2014-04-02 12:09:55 -0700250 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
251 return new BarrierInfo(sw.getId(), rpy.getXid());
252 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800253
Ray Milkey8e5170e2014-04-02 12:09:55 -0700254 private BarrierInfo(long dpid, int xid) {
255 this.dpid = dpid;
256 this.xid = xid;
257 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800258
Ray Milkey8e5170e2014-04-02 12:09:55 -0700259 // Auto generated code by Eclipse
260 @Override
261 public int hashCode() {
262 final int prime = 31;
263 int result = 1;
264 result = prime * result + (int) (dpid ^ (dpid >>> 32));
265 result = prime * result + xid;
266 return result;
267 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800268
Ray Milkey8e5170e2014-04-02 12:09:55 -0700269 @Override
270 public boolean equals(Object obj) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700271 if (this == obj) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700272 return true;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700273 }
274 if (obj == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700275 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700276 }
277 if (getClass() != obj.getClass()) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700278 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700279 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800280
Ray Milkey8e5170e2014-04-02 12:09:55 -0700281 BarrierInfo other = (BarrierInfo) obj;
282 return (this.dpid == other.dpid) && (this.xid == other.xid);
283 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800284
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800285
Ray Milkey8e5170e2014-04-02 12:09:55 -0700286 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800287
Ray Milkey8e5170e2014-04-02 12:09:55 -0700288 private OFMessageDamper messageDamper = null;
289 private IThreadPoolService threadPool = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800290
Ray Milkey8e5170e2014-04-02 12:09:55 -0700291 private FloodlightContext context = null;
292 private BasicFactory factory = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800293
Ray Milkey8e5170e2014-04-02 12:09:55 -0700294 // Map of threads versus dpid
295 private Map<Long, FlowPusherThread> threadMap = null;
296 // Map from (DPID and transaction ID) to Future objects.
297 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
298 = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800299
Ray Milkey2476cac2014-04-08 11:03:21 -0700300 private int numberThread;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800301
Ray Milkey8e5170e2014-04-02 12:09:55 -0700302 /**
303 * Main thread that reads messages from queues and sends them to switches.
304 *
305 * @author Naoki Shiota
306 */
307 private class FlowPusherThread extends Thread {
308 private Map<IOFSwitch, SwitchQueue> assignedQueues
309 = new ConcurrentHashMap<IOFSwitch, SwitchQueue>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800310
Ray Milkey8e5170e2014-04-02 12:09:55 -0700311 final Lock queuingLock = new ReentrantLock();
312 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800313
Ray Milkey8e5170e2014-04-02 12:09:55 -0700314 @Override
315 public void run() {
316 this.setName("FlowPusherThread " + this.getId());
317 while (true) {
318 while (!queuesHasMessageToSend()) {
319 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800320
Ray Milkey8e5170e2014-04-02 12:09:55 -0700321 try {
322 // wait for message pushed to queue
323 messagePushed.await();
324 } catch (InterruptedException e) {
325 // Interrupted to be shut down (not an error)
326 log.debug("FlowPusherThread is interrupted");
327 return;
328 } finally {
329 queuingLock.unlock();
330 }
331 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800332
Ray Milkey8e5170e2014-04-02 12:09:55 -0700333 // for safety of concurrent access, copy set of key objects
334 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
335 for (IOFSwitch sw : assignedQueues.keySet()) {
336 keys.add(sw);
337 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800338
Ray Milkey8e5170e2014-04-02 12:09:55 -0700339 for (IOFSwitch sw : keys) {
340 SwitchQueue queue = assignedQueues.get(sw);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800341
Ray Milkey8e5170e2014-04-02 12:09:55 -0700342 if (sw == null || queue == null) {
343 continue;
344 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800345
Ray Milkey8e5170e2014-04-02 12:09:55 -0700346 synchronized (queue) {
347 processQueue(sw, queue, MAX_MESSAGE_SEND);
348 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
349 // remove queue if flagged to be.
350 assignedQueues.remove(sw);
351 }
352 }
353 }
354 }
355 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800356
Ray Milkey8e5170e2014-04-02 12:09:55 -0700357 /**
358 * Read messages from queue and send them to the switch.
359 * If number of messages excess the limit, stop sending messages.
360 *
361 * @param sw Switch to which messages will be sent.
362 * @param queue Queue of messages.
363 * @param max_msg Limitation of number of messages to be sent. If set to 0,
364 * all messages in queue will be sent.
365 */
366 private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
367 // check sending rate and determine it to be sent or not
Ray Milkey2476cac2014-04-08 11:03:21 -0700368 long currentTime = System.currentTimeMillis();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700369 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800370
Ray Milkey2476cac2014-04-08 11:03:21 -0700371 if (queue.isSendable(currentTime)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700372 int i = 0;
373 while (queue.hasMessageToSend()) {
374 // Number of messages excess the limit
375 if (0 < max_msg && max_msg <= i) {
376 break;
377 }
378 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800379
Ray Milkey8e5170e2014-04-02 12:09:55 -0700380 SwitchQueueEntry queueEntry;
381 synchronized (queue) {
382 queueEntry = queue.poll();
383 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800384
Ray Milkey8e5170e2014-04-02 12:09:55 -0700385 OFMessage msg = queueEntry.getOFMessage();
386 try {
387 messageDamper.write(sw, msg, context);
388 if (log.isTraceEnabled()) {
389 log.trace("Pusher sends message : {}", msg);
390 }
391 size += msg.getLength();
392 } catch (IOException e) {
393 e.printStackTrace();
394 log.error("Exception in sending message ({}) : {}", msg, e);
395 }
396 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800397
Ray Milkey8e5170e2014-04-02 12:09:55 -0700398 sw.flush();
Ray Milkey2476cac2014-04-08 11:03:21 -0700399 queue.logSentData(currentTime, size);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700400 }
401 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800402
Ray Milkey8e5170e2014-04-02 12:09:55 -0700403 private boolean queuesHasMessageToSend() {
404 for (SwitchQueue queue : assignedQueues.values()) {
405 if (queue.hasMessageToSend()) {
406 return true;
407 }
408 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700409
Ray Milkey8e5170e2014-04-02 12:09:55 -0700410 return false;
411 }
Naoki Shiota05334692014-03-18 16:06:36 -0700412
Ray Milkey8e5170e2014-04-02 12:09:55 -0700413 private void notifyMessagePushed() {
414 queuingLock.lock();
415 try {
416 messagePushed.signal();
417 } finally {
418 queuingLock.unlock();
419 }
420 }
421 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700422
Ray Milkey8e5170e2014-04-02 12:09:55 -0700423 /**
424 * Initialize object with one thread.
425 */
426 public FlowPusher() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700427 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700428 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800429
Ray Milkey8e5170e2014-04-02 12:09:55 -0700430 /**
431 * Initialize object with threads of given number.
432 *
433 * @param number_thread Number of threads to handle messages.
434 */
435 public FlowPusher(int number_thread) {
436 if (number_thread > 0) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700437 this.numberThread = number_thread;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700438 } else {
Ray Milkey2476cac2014-04-08 11:03:21 -0700439 this.numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700440 }
441 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800442
Ray Milkey8e5170e2014-04-02 12:09:55 -0700443 /**
444 * Set parameters needed for sending messages.
445 *
446 * @param context FloodlightContext used for sending messages.
447 * If null, FlowPusher uses default context.
448 * @param modContext FloodlightModuleContext used for acquiring
449 * ThreadPoolService and registering MessageListener.
450 * @param factory Factory object to create OFMessage objects.
451 * @param damper Message damper used for sending messages.
452 * If null, FlowPusher creates its own damper object.
453 */
454 public void init(FloodlightContext context,
455 FloodlightModuleContext modContext,
456 BasicFactory factory,
457 OFMessageDamper damper) {
458 this.context = context;
459 this.factory = factory;
460 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
461 IFloodlightProviderService flservice
462 = modContext.getServiceImpl(IFloodlightProviderService.class);
463 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800464
Ray Milkey8e5170e2014-04-02 12:09:55 -0700465 if (damper != null) {
466 messageDamper = damper;
467 } else {
468 // use default values
469 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
470 EnumSet.of(OFType.FLOW_MOD),
471 OFMESSAGE_DAMPER_TIMEOUT);
472 }
473 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800474
Ray Milkey8e5170e2014-04-02 12:09:55 -0700475 /**
476 * Begin processing queue.
477 */
478 public void start() {
479 if (factory == null) {
480 log.error("FlowPusher not yet initialized.");
481 return;
482 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800483
Ray Milkey8e5170e2014-04-02 12:09:55 -0700484 threadMap = new HashMap<Long, FlowPusherThread>();
Ray Milkey2476cac2014-04-08 11:03:21 -0700485 for (long i = 0; i < numberThread; ++i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700486 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800487
Ray Milkey8e5170e2014-04-02 12:09:55 -0700488 threadMap.put(i, thread);
489 thread.start();
490 }
491 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800492
Ray Milkey8e5170e2014-04-02 12:09:55 -0700493 @Override
494 public boolean suspend(IOFSwitch sw) {
495 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700496
Ray Milkey8e5170e2014-04-02 12:09:55 -0700497 if (queue == null) {
498 // create queue in case suspend is called before first message addition
499 queue = createQueueImpl(sw);
500 }
501
502 synchronized (queue) {
503 if (queue.state == QueueState.READY) {
504 queue.state = QueueState.SUSPENDED;
505 return true;
506 }
507 return false;
508 }
509 }
510
511 @Override
512 public boolean resume(IOFSwitch sw) {
513 SwitchQueue queue = getQueue(sw);
514
515 if (queue == null) {
516 log.error("No queue is attached to DPID : {}", sw.getId());
517 return false;
518 }
519
520 synchronized (queue) {
521 if (queue.state == QueueState.SUSPENDED) {
522 queue.state = QueueState.READY;
523
524 // Free the latch if queue has any messages
525 FlowPusherThread thread = getProcessingThread(sw);
526 if (queue.hasMessageToSend()) {
527 thread.notifyMessagePushed();
528 }
529 return true;
530 }
531 return false;
532 }
533 }
534
535 @Override
536 public QueueState getState(IOFSwitch sw) {
537 SwitchQueue queue = getQueue(sw);
538
539 if (queue == null) {
540 return QueueState.UNKNOWN;
541 }
542
543 return queue.state;
544 }
545
546 /**
547 * Stop processing queue and exit thread.
548 */
549 public void stop() {
550 if (threadMap == null) {
551 return;
552 }
553
554 for (FlowPusherThread t : threadMap.values()) {
555 t.interrupt();
556 }
557 }
558
559 @Override
560 public void setRate(IOFSwitch sw, long rate) {
561 SwitchQueue queue = getQueue(sw);
562 if (queue == null) {
563 queue = createQueueImpl(sw);
564 }
565
566 if (rate > 0) {
567 log.debug("rate for {} is set to {}", sw.getId(), rate);
568 synchronized (queue) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700569 queue.maxRate = rate;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700570 }
571 }
572 }
573
574 @Override
575 public boolean createQueue(IOFSwitch sw) {
576 SwitchQueue queue = createQueueImpl(sw);
577
578 return (queue != null);
579 }
580
581 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
582 SwitchQueue queue = getQueue(sw);
583 if (queue != null) {
584 return queue;
585 }
586
587 FlowPusherThread proc = getProcessingThread(sw);
588 queue = new SwitchQueue();
589 queue.state = QueueState.READY;
590 proc.assignedQueues.put(sw, queue);
591
592 return queue;
593 }
594
595 @Override
596 public boolean deleteQueue(IOFSwitch sw) {
597 return deleteQueue(sw, false);
598 }
599
600 @Override
601 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
602 FlowPusherThread proc = getProcessingThread(sw);
603
604 if (forceStop) {
605 SwitchQueue queue = proc.assignedQueues.remove(sw);
606 if (queue == null) {
607 return false;
608 }
609 return true;
610 } else {
611 SwitchQueue queue = getQueue(sw);
612 if (queue == null) {
613 return false;
614 }
615 synchronized (queue) {
616 queue.toBeDeleted = true;
617 }
618 return true;
619 }
620 }
621
622 @Override
623 public boolean add(IOFSwitch sw, OFMessage msg) {
624 return add(sw, msg, MsgPriority.NORMAL);
625 }
626
627 @Override
628 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
629 return addMessageImpl(sw, msg, priority);
630 }
631
632 @Override
633 public void pushFlowEntries(
634 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
635 pushFlowEntries(entries, MsgPriority.NORMAL);
636 }
637
638 @Override
639 public void pushFlowEntries(
640 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
641
642 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
643 add(entry.first, entry.second, priority);
644 }
645 }
646
647 @Override
648 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
649 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
650 }
651
652 @Override
653 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
654 Collection<Pair<IOFSwitch, FlowEntry>> entries =
655 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
656
657 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
658 pushFlowEntries(entries, priority);
659 }
660
661 /**
662 * Create a message from FlowEntry and add it to the queue of the switch.
663 *
664 * @param sw Switch to which message is pushed.
665 * @param flowEntry FlowEntry object used for creating message.
666 * @return true if message is successfully added to a queue.
667 */
668 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
669 //
670 // Create the OpenFlow Flow Modification Entry to push
671 //
672 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
673 long cookie = flowEntry.flowEntryId().value();
674
675 short flowModCommand = OFFlowMod.OFPFC_ADD;
676 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
677 flowModCommand = OFFlowMod.OFPFC_ADD;
678 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
679 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
680 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
681 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
682 } else {
683 // Unknown user state. Ignore the entry
684 log.debug(
685 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
686 flowEntry.flowEntryId(),
687 flowEntry.flowEntryUserState());
688 return false;
689 }
690
691 //
692 // Fetch the match conditions.
693 //
694 // NOTE: The Flow matching conditions common for all Flow Entries are
695 // used ONLY if a Flow Entry does NOT have the corresponding matching
696 // condition set.
697 //
698 OFMatch match = new OFMatch();
699 match.setWildcards(OFMatch.OFPFW_ALL);
700 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
701
702 // Match the Incoming Port
703 Port matchInPort = flowEntryMatch.inPort();
704 if (matchInPort != null) {
705 match.setInputPort(matchInPort.value());
706 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
707 }
708
709 // Match the Source MAC address
710 MACAddress matchSrcMac = flowEntryMatch.srcMac();
711 if (matchSrcMac != null) {
712 match.setDataLayerSource(matchSrcMac.toString());
713 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
714 }
715
716 // Match the Destination MAC address
717 MACAddress matchDstMac = flowEntryMatch.dstMac();
718 if (matchDstMac != null) {
719 match.setDataLayerDestination(matchDstMac.toString());
720 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
721 }
722
723 // Match the Ethernet Frame Type
724 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
725 if (matchEthernetFrameType != null) {
726 match.setDataLayerType(matchEthernetFrameType);
727 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
728 }
729
730 // Match the VLAN ID
731 Short matchVlanId = flowEntryMatch.vlanId();
732 if (matchVlanId != null) {
733 match.setDataLayerVirtualLan(matchVlanId);
734 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
735 }
736
737 // Match the VLAN priority
738 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
739 if (matchVlanPriority != null) {
740 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
741 match.setWildcards(match.getWildcards()
742 & ~OFMatch.OFPFW_DL_VLAN_PCP);
743 }
744
745 // Match the Source IPv4 Network prefix
746 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
747 if (matchSrcIPv4Net != null) {
748 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
749 }
750
751 // Natch the Destination IPv4 Network prefix
752 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
753 if (matchDstIPv4Net != null) {
754 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
755 }
756
757 // Match the IP protocol
758 Byte matchIpProto = flowEntryMatch.ipProto();
759 if (matchIpProto != null) {
760 match.setNetworkProtocol(matchIpProto);
761 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
762 }
763
764 // Match the IP ToS (DSCP field, 6 bits)
765 Byte matchIpToS = flowEntryMatch.ipToS();
766 if (matchIpToS != null) {
767 match.setNetworkTypeOfService(matchIpToS);
768 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
769 }
770
771 // Match the Source TCP/UDP port
772 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
773 if (matchSrcTcpUdpPort != null) {
774 match.setTransportSource(matchSrcTcpUdpPort);
775 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
776 }
777
778 // Match the Destination TCP/UDP port
779 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
780 if (matchDstTcpUdpPort != null) {
781 match.setTransportDestination(matchDstTcpUdpPort);
782 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
783 }
784
785 //
786 // Fetch the actions
787 //
788 Short actionOutputPort = null;
789 List<OFAction> openFlowActions = new ArrayList<OFAction>();
790 int actionsLen = 0;
791 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
792 //
793 for (FlowEntryAction action : flowEntryActions.actions()) {
794 ActionOutput actionOutput = action.actionOutput();
795 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
796 ActionSetVlanPriority actionSetVlanPriority = action
797 .actionSetVlanPriority();
798 ActionStripVlan actionStripVlan = action.actionStripVlan();
799 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
800 .actionSetEthernetSrcAddr();
801 ActionSetEthernetAddr actionSetEthernetDstAddr = action
802 .actionSetEthernetDstAddr();
803 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
804 .actionSetIPv4SrcAddr();
805 ActionSetIPv4Addr actionSetIPv4DstAddr = action
806 .actionSetIPv4DstAddr();
807 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
808 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
809 .actionSetTcpUdpSrcPort();
810 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
811 .actionSetTcpUdpDstPort();
812 ActionEnqueue actionEnqueue = action.actionEnqueue();
813
814 if (actionOutput != null) {
815 actionOutputPort = actionOutput.port().value();
816 // XXX: The max length is hard-coded for now
817 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
818 .value(), (short) 0xffff);
819 openFlowActions.add(ofa);
820 actionsLen += ofa.getLength();
821 }
822
823 if (actionSetVlanId != null) {
824 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
825 actionSetVlanId.vlanId());
826 openFlowActions.add(ofa);
827 actionsLen += ofa.getLength();
828 }
829
830 if (actionSetVlanPriority != null) {
831 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
832 actionSetVlanPriority.vlanPriority());
833 openFlowActions.add(ofa);
834 actionsLen += ofa.getLength();
835 }
836
837 if (actionStripVlan != null) {
838 if (actionStripVlan.stripVlan() == true) {
839 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
840 openFlowActions.add(ofa);
841 actionsLen += ofa.getLength();
842 }
843 }
844
845 if (actionSetEthernetSrcAddr != null) {
846 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
847 actionSetEthernetSrcAddr.addr().toBytes());
848 openFlowActions.add(ofa);
849 actionsLen += ofa.getLength();
850 }
851
852 if (actionSetEthernetDstAddr != null) {
853 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
854 actionSetEthernetDstAddr.addr().toBytes());
855 openFlowActions.add(ofa);
856 actionsLen += ofa.getLength();
857 }
858
859 if (actionSetIPv4SrcAddr != null) {
860 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
861 actionSetIPv4SrcAddr.addr().value());
862 openFlowActions.add(ofa);
863 actionsLen += ofa.getLength();
864 }
865
866 if (actionSetIPv4DstAddr != null) {
867 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
868 actionSetIPv4DstAddr.addr().value());
869 openFlowActions.add(ofa);
870 actionsLen += ofa.getLength();
871 }
872
873 if (actionSetIpToS != null) {
874 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
875 actionSetIpToS.ipToS());
876 openFlowActions.add(ofa);
877 actionsLen += ofa.getLength();
878 }
879
880 if (actionSetTcpUdpSrcPort != null) {
881 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
882 actionSetTcpUdpSrcPort.port());
883 openFlowActions.add(ofa);
884 actionsLen += ofa.getLength();
885 }
886
887 if (actionSetTcpUdpDstPort != null) {
888 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
889 actionSetTcpUdpDstPort.port());
890 openFlowActions.add(ofa);
891 actionsLen += ofa.getLength();
892 }
893
894 if (actionEnqueue != null) {
895 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
896 .value(), actionEnqueue.queueId());
897 openFlowActions.add(ofa);
898 actionsLen += ofa.getLength();
899 }
900 }
901
902 fm.setIdleTimeout((short) flowEntry.idleTimeout())
903 .setHardTimeout((short) flowEntry.hardTimeout())
904 .setPriority((short) flowEntry.priority())
905 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
906 .setCommand(flowModCommand).setMatch(match)
907 .setActions(openFlowActions)
908 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
909 fm.setOutPort(OFPort.OFPP_NONE.getValue());
910 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
911 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700912 if (actionOutputPort != null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700913 fm.setOutPort(actionOutputPort);
Ray Milkeyb29e6262014-04-09 16:02:14 -0700914 }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700915 }
916
917 //
918 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
919 // permanent.
920 //
921 if ((flowEntry.idleTimeout() != 0) ||
922 (flowEntry.hardTimeout() != 0)) {
923 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
924 }
925
926 if (log.isTraceEnabled()) {
927 log.trace("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
928 , flowEntry.flowEntryUserState()
929 , sw.getStringId()
930 , flowEntry.flowEntryId()
931 , matchSrcMac
932 , matchDstMac
933 , matchInPort
934 , actionOutputPort
935 );
936 }
937
938 return addMessageImpl(sw, fm, priority);
939 }
940
941 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700942 * Add message to queue.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700943 *
944 * @param sw
945 * @param msg
946 * @param flowEntryId
947 * @return
948 */
949 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
950 FlowPusherThread thread = getProcessingThread(sw);
951
952 SwitchQueue queue = getQueue(sw);
953
954 // create queue at first addition of message
955 if (queue == null) {
956 queue = createQueueImpl(sw);
957 }
958
959 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
960
961 synchronized (queue) {
962 queue.add(entry, priority);
963 if (log.isTraceEnabled()) {
964 log.trace("Message is pushed : {}", entry.getOFMessage());
965 }
966 }
967
968 thread.notifyMessagePushed();
969
970 return true;
971 }
972
973 @Override
974 public OFBarrierReply barrier(IOFSwitch sw) {
975 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
976 if (future == null) {
977 return null;
978 }
979
980 try {
981 return future.get();
982 } catch (InterruptedException e) {
983 e.printStackTrace();
984 log.error("InterruptedException: {}", e);
985 return null;
986 } catch (ExecutionException e) {
987 e.printStackTrace();
988 log.error("ExecutionException: {}", e);
989 return null;
990 }
991 }
992
993 @Override
994 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
995 // TODO creation of message and future should be moved to OFSwitchImpl
996
997 if (sw == null) {
998 return null;
999 }
1000
1001 OFBarrierRequest msg = createBarrierRequest(sw);
1002
1003 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
1004 barrierFutures.put(BarrierInfo.create(sw, msg), future);
1005
1006 addMessageImpl(sw, msg, MsgPriority.NORMAL);
1007
1008 return future;
1009 }
1010
1011 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
1012 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
1013 msg.setXid(sw.getNextTransactionId());
1014
1015 return msg;
1016 }
1017
1018 /**
1019 * Get a queue attached to a switch.
1020 *
1021 * @param sw Switch object
1022 * @return Queue object
1023 */
1024 protected SwitchQueue getQueue(IOFSwitch sw) {
1025 if (sw == null) {
1026 return null;
1027 }
1028
1029 FlowPusherThread th = getProcessingThread(sw);
1030 if (th == null) {
1031 return null;
1032 }
1033
1034 return th.assignedQueues.get(sw);
1035 }
1036
1037 /**
1038 * Get a hash value correspondent to a switch.
1039 *
1040 * @param sw Switch object
1041 * @return Hash value
1042 */
1043 protected long getHash(IOFSwitch sw) {
1044 // This code assumes DPID is sequentially assigned.
1045 // TODO consider equalization algorithm
Ray Milkey2476cac2014-04-08 11:03:21 -07001046 return sw.getId() % numberThread;
Ray Milkey8e5170e2014-04-02 12:09:55 -07001047 }
1048
1049 /**
1050 * Get a Thread object which processes the queue attached to a switch.
1051 *
1052 * @param sw Switch object
1053 * @return Thread object
1054 */
1055 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
1056 long hash = getHash(sw);
1057
1058 return threadMap.get(hash);
1059 }
1060
1061 @Override
1062 public String getName() {
1063 return "flowpusher";
1064 }
1065
1066 @Override
1067 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1068 return false;
1069 }
1070
1071 @Override
1072 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1073 return false;
1074 }
1075
1076 @Override
1077 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
1078 if (log.isTraceEnabled()) {
1079 log.trace("Received BARRIER_REPLY from : {}", sw.getId());
1080 }
1081
1082 if (msg.getType() != OFType.BARRIER_REPLY) {
1083 log.error("Unexpected reply message : {}", msg.getType());
1084 return Command.CONTINUE;
1085 }
1086
1087 OFBarrierReply reply = (OFBarrierReply) msg;
1088 BarrierInfo info = BarrierInfo.create(sw, reply);
1089
1090 // Deliver future if exists
1091 OFBarrierReplyFuture future = barrierFutures.get(info);
1092 if (future != null) {
1093 future.deliverFuture(sw, msg);
1094 barrierFutures.remove(info);
1095 }
1096
1097 return Command.CONTINUE;
1098 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001099}