blob: 1bf9ee65d0bc68c2f1de939b4afa6d1f18798f7a [file] [log] [blame]
Jonathan Hart23701d12014-04-03 10:45:48 -07001package net.onrc.onos.core.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080014import java.util.Set;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070015import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080016import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070017import java.util.concurrent.locks.Condition;
18import java.util.concurrent.locks.Lock;
19import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070021import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080022import net.floodlightcontroller.core.IFloodlightProviderService;
23import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070024import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080025import net.floodlightcontroller.core.internal.OFMessageFuture;
26import net.floodlightcontroller.core.module.FloodlightModuleContext;
27import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080028import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080029import net.floodlightcontroller.util.OFMessageDamper;
Jonathan Hart23701d12014-04-03 10:45:48 -070030import net.onrc.onos.core.util.FlowEntry;
31import net.onrc.onos.core.util.FlowEntryAction;
Jonathan Harta99ec672014-04-03 11:30:34 -070032import net.onrc.onos.core.util.FlowEntryAction.ActionEnqueue;
33import net.onrc.onos.core.util.FlowEntryAction.ActionOutput;
34import net.onrc.onos.core.util.FlowEntryAction.ActionSetEthernetAddr;
35import net.onrc.onos.core.util.FlowEntryAction.ActionSetIPv4Addr;
36import net.onrc.onos.core.util.FlowEntryAction.ActionSetIpToS;
37import net.onrc.onos.core.util.FlowEntryAction.ActionSetTcpUdpPort;
38import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanId;
39import net.onrc.onos.core.util.FlowEntryAction.ActionSetVlanPriority;
40import net.onrc.onos.core.util.FlowEntryAction.ActionStripVlan;
Jonathan Hart23701d12014-04-03 10:45:48 -070041import net.onrc.onos.core.util.FlowEntryActions;
42import net.onrc.onos.core.util.FlowEntryMatch;
43import net.onrc.onos.core.util.FlowEntryUserState;
44import net.onrc.onos.core.util.IPv4Net;
45import net.onrc.onos.core.util.Pair;
46import net.onrc.onos.core.util.Port;
Jonathan Harta99ec672014-04-03 11:30:34 -070047
48import org.openflow.protocol.OFBarrierReply;
49import org.openflow.protocol.OFBarrierRequest;
50import org.openflow.protocol.OFFlowMod;
51import org.openflow.protocol.OFMatch;
52import org.openflow.protocol.OFMessage;
53import org.openflow.protocol.OFPacketOut;
54import org.openflow.protocol.OFPort;
55import org.openflow.protocol.OFType;
56import org.openflow.protocol.action.OFAction;
57import org.openflow.protocol.action.OFActionDataLayerDestination;
58import org.openflow.protocol.action.OFActionDataLayerSource;
59import org.openflow.protocol.action.OFActionEnqueue;
60import org.openflow.protocol.action.OFActionNetworkLayerDestination;
61import org.openflow.protocol.action.OFActionNetworkLayerSource;
62import org.openflow.protocol.action.OFActionNetworkTypeOfService;
63import org.openflow.protocol.action.OFActionOutput;
64import org.openflow.protocol.action.OFActionStripVirtualLan;
65import org.openflow.protocol.action.OFActionTransportLayerDestination;
66import org.openflow.protocol.action.OFActionTransportLayerSource;
67import org.openflow.protocol.action.OFActionVirtualLanIdentifier;
68import org.openflow.protocol.action.OFActionVirtualLanPriorityCodePoint;
69import org.openflow.protocol.factory.BasicFactory;
70import org.slf4j.Logger;
71import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070072
Jonathan Hartc00f5c22014-06-10 15:14:40 -070073import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
74
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070075/**
Naoki Shiotab485d412013-11-26 12:04:19 -080076 * FlowPusher is a implementation of FlowPusherService.
77 * FlowPusher assigns one message queue instance for each one switch.
78 * Number of message processing threads is configurable by constructor, and
Ray Milkey8e5170e2014-04-02 12:09:55 -070079 * one thread can handle multiple message queues. Each queue will be assigned to
Naoki Shiotab485d412013-11-26 12:04:19 -080080 * a thread according to hash function defined by getHash().
81 * Each processing thread reads messages from queues and sends it to switches
82 * in round-robin. Processing thread also calculates rate of sending to suppress
83 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070084 *
Ray Milkey8e5170e2014-04-02 12:09:55 -070085 * @author Naoki Shiota
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070086 */
Ray Milkey1584ec82014-04-10 11:58:30 -070087public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
Ray Milkeyec838942014-04-09 11:28:43 -070088 private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
Naoki Shiota8df97bc2014-03-13 18:42:23 -070089 protected static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080090
Naoki Shiota7d0cf272013-11-05 10:18:12 -080091 // TODO: Values copied from elsewhere (class LearningSwitch).
92 // The local copy should go away!
93 //
Yuta HIGUCHI7930d8a2014-06-09 11:32:37 -070094 protected static final int OFMESSAGE_DAMPER_CAPACITY = 10000; // TODO: find sweet spot
Ray Milkey8e5170e2014-04-02 12:09:55 -070095 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
96
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080097 // Number of messages sent to switch at once
98 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080099
Ray Milkey8e5170e2014-04-02 12:09:55 -0700100 private static class SwitchQueueEntry {
101 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700102
Ray Milkey8e5170e2014-04-02 12:09:55 -0700103 public SwitchQueueEntry(OFMessage msg) {
104 this.msg = msg;
105 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700106
Ray Milkey8e5170e2014-04-02 12:09:55 -0700107 public OFMessage getOFMessage() {
108 return msg;
109 }
110 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700111
Ray Milkey8e5170e2014-04-02 12:09:55 -0700112 /**
113 * SwitchQueue represents message queue attached to a switch.
114 * This consists of queue itself and variables used for limiting sending rate.
115 *
116 * @author Naoki Shiota
117 */
Pavlin Radoslavovfee80982014-04-10 12:12:04 -0700118 private static class SwitchQueue {
Ray Milkey2476cac2014-04-08 11:03:21 -0700119 List<Queue<SwitchQueueEntry>> rawQueues;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700120 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800121
Ray Milkey8e5170e2014-04-02 12:09:55 -0700122 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Ray Milkey2476cac2014-04-08 11:03:21 -0700123 long maxRate = 0; // 0 indicates no limitation
124 long lastSentTime = 0;
125 long lastSentSize = 0;
Naoki Shiota05334692014-03-18 16:06:36 -0700126
Ray Milkey8e5170e2014-04-02 12:09:55 -0700127 // "To be deleted" flag
128 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800129
Ray Milkey8e5170e2014-04-02 12:09:55 -0700130 SwitchQueue() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700131 rawQueues = new ArrayList<Queue<SwitchQueueEntry>>(
Ray Milkey8e5170e2014-04-02 12:09:55 -0700132 MsgPriority.values().length);
133 for (int i = 0; i < MsgPriority.values().length; ++i) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700134 rawQueues.add(i, new ArrayDeque<SwitchQueueEntry>());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700135 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800136
Ray Milkey8e5170e2014-04-02 12:09:55 -0700137 state = QueueState.READY;
138 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800139
Ray Milkey8e5170e2014-04-02 12:09:55 -0700140 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700141 * Check if sending rate is within the rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700142 *
143 * @param current Current time
144 * @return true if within the rate
145 */
146 boolean isSendable(long current) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700147 if (maxRate == 0) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700148 // no limitation
149 return true;
150 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800151
Ray Milkey2476cac2014-04-08 11:03:21 -0700152 if (current == lastSentTime) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700153 return false;
154 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800155
Ray Milkey8e5170e2014-04-02 12:09:55 -0700156 // Check if sufficient time (from aspect of rate) elapsed or not.
Ray Milkey2476cac2014-04-08 11:03:21 -0700157 long rate = lastSentSize / (current - lastSentTime);
158 return (rate < maxRate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700159 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800160
Ray Milkey8e5170e2014-04-02 12:09:55 -0700161 /**
162 * Log time and size of last sent data.
163 *
164 * @param current Time to be sent.
165 * @param size Size of sent data (in bytes).
166 */
167 void logSentData(long current, long size) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700168 lastSentTime = current;
169 lastSentSize = size;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700170 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700171
Ray Milkey8e5170e2014-04-02 12:09:55 -0700172 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
173 Queue<SwitchQueueEntry> queue = getQueue(priority);
174 if (queue == null) {
175 log.error("Unexpected priority : ", priority);
176 return false;
177 }
178 return queue.add(entry);
179 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800180
Ray Milkey8e5170e2014-04-02 12:09:55 -0700181 /**
182 * Poll single appropriate entry object according to QueueState.
183 *
184 * @return Entry object.
185 */
186 SwitchQueueEntry poll() {
187 switch (state) {
188 case READY: {
Ray Milkey2476cac2014-04-08 11:03:21 -0700189 for (int i = 0; i < rawQueues.size(); ++i) {
190 SwitchQueueEntry entry = rawQueues.get(i).poll();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700191 if (entry != null) {
192 return entry;
193 }
194 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800195
Ray Milkey8e5170e2014-04-02 12:09:55 -0700196 return null;
197 }
198 case SUSPENDED: {
199 // Only polling from high priority queue
200 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
201 return entry;
202 }
203 default:
204 log.error("Unexpected QueueState : ", state);
205 return null;
206 }
207 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800208
Ray Milkey8e5170e2014-04-02 12:09:55 -0700209 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700210 * Check if this object has any messages in the queues to be sent.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700211 *
212 * @return True if there are some messages to be sent.
213 */
214 boolean hasMessageToSend() {
215 switch (state) {
216 case READY:
Ray Milkey2476cac2014-04-08 11:03:21 -0700217 for (Queue<SwitchQueueEntry> queue : rawQueues) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700218 if (!queue.isEmpty()) {
219 return true;
220 }
221 }
222 break;
223 case SUSPENDED:
224 // Only checking high priority queue
225 return (!getQueue(MsgPriority.HIGH).isEmpty());
226 default:
227 log.error("Unexpected QueueState : ", state);
228 return false;
229 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800230
Ray Milkey8e5170e2014-04-02 12:09:55 -0700231 return false;
232 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800233
Ray Milkey8e5170e2014-04-02 12:09:55 -0700234 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700235 return rawQueues.get(priority.ordinal());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700236 }
237 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800238
Ray Milkey8e5170e2014-04-02 12:09:55 -0700239 /**
240 * BarrierInfo holds information to specify barrier message sent to switch.
241 *
242 * @author Naoki
243 */
Ray Milkey1584ec82014-04-10 11:58:30 -0700244 private static final class BarrierInfo {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700245 final long dpid;
246 final int xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800247
Ray Milkey8e5170e2014-04-02 12:09:55 -0700248 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
249 return new BarrierInfo(sw.getId(), req.getXid());
250 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800251
Ray Milkey8e5170e2014-04-02 12:09:55 -0700252 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
253 return new BarrierInfo(sw.getId(), rpy.getXid());
254 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800255
Ray Milkey8e5170e2014-04-02 12:09:55 -0700256 private BarrierInfo(long dpid, int xid) {
257 this.dpid = dpid;
258 this.xid = xid;
259 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800260
Ray Milkey8e5170e2014-04-02 12:09:55 -0700261 // Auto generated code by Eclipse
262 @Override
263 public int hashCode() {
264 final int prime = 31;
265 int result = 1;
266 result = prime * result + (int) (dpid ^ (dpid >>> 32));
267 result = prime * result + xid;
268 return result;
269 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800270
Ray Milkey8e5170e2014-04-02 12:09:55 -0700271 @Override
272 public boolean equals(Object obj) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700273 if (this == obj) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700274 return true;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700275 }
276 if (obj == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700277 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700278 }
279 if (getClass() != obj.getClass()) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700280 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700281 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800282
Ray Milkey8e5170e2014-04-02 12:09:55 -0700283 BarrierInfo other = (BarrierInfo) obj;
284 return (this.dpid == other.dpid) && (this.xid == other.xid);
285 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800286
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800287
Ray Milkey8e5170e2014-04-02 12:09:55 -0700288 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800289
Ray Milkey8e5170e2014-04-02 12:09:55 -0700290 private OFMessageDamper messageDamper = null;
291 private IThreadPoolService threadPool = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800292
Ray Milkey8e5170e2014-04-02 12:09:55 -0700293 private FloodlightContext context = null;
294 private BasicFactory factory = null;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800295
Ray Milkey8e5170e2014-04-02 12:09:55 -0700296 // Map of threads versus dpid
297 private Map<Long, FlowPusherThread> threadMap = null;
298 // Map from (DPID and transaction ID) to Future objects.
299 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
300 = new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800301
Ray Milkey2476cac2014-04-08 11:03:21 -0700302 private int numberThread;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800303
Ray Milkey8e5170e2014-04-02 12:09:55 -0700304 /**
305 * Main thread that reads messages from queues and sends them to switches.
306 *
307 * @author Naoki Shiota
308 */
309 private class FlowPusherThread extends Thread {
310 private Map<IOFSwitch, SwitchQueue> assignedQueues
311 = new ConcurrentHashMap<IOFSwitch, SwitchQueue>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800312
Ray Milkey8e5170e2014-04-02 12:09:55 -0700313 final Lock queuingLock = new ReentrantLock();
314 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800315
Ray Milkey8e5170e2014-04-02 12:09:55 -0700316 @Override
317 public void run() {
318 this.setName("FlowPusherThread " + this.getId());
319 while (true) {
320 while (!queuesHasMessageToSend()) {
321 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800322
Ray Milkey8e5170e2014-04-02 12:09:55 -0700323 try {
324 // wait for message pushed to queue
325 messagePushed.await();
326 } catch (InterruptedException e) {
327 // Interrupted to be shut down (not an error)
328 log.debug("FlowPusherThread is interrupted");
329 return;
330 } finally {
331 queuingLock.unlock();
332 }
333 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800334
Ray Milkey8e5170e2014-04-02 12:09:55 -0700335 // for safety of concurrent access, copy set of key objects
336 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(assignedQueues.size());
337 for (IOFSwitch sw : assignedQueues.keySet()) {
338 keys.add(sw);
339 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800340
Ray Milkey8e5170e2014-04-02 12:09:55 -0700341 for (IOFSwitch sw : keys) {
342 SwitchQueue queue = assignedQueues.get(sw);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800343
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700344 if (queue == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700345 continue;
346 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800347
Ray Milkey8e5170e2014-04-02 12:09:55 -0700348 synchronized (queue) {
349 processQueue(sw, queue, MAX_MESSAGE_SEND);
350 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
351 // remove queue if flagged to be.
352 assignedQueues.remove(sw);
353 }
354 }
355 }
356 }
357 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800358
Ray Milkey8e5170e2014-04-02 12:09:55 -0700359 /**
360 * Read messages from queue and send them to the switch.
361 * If number of messages excess the limit, stop sending messages.
362 *
363 * @param sw Switch to which messages will be sent.
364 * @param queue Queue of messages.
Ray Milkey9526d6f2014-04-10 14:54:15 -0700365 * @param maxMsg Limitation of number of messages to be sent. If set to 0,
Ray Milkey8e5170e2014-04-02 12:09:55 -0700366 * all messages in queue will be sent.
367 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700368 private void processQueue(IOFSwitch sw, SwitchQueue queue, int maxMsg) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700369 // check sending rate and determine it to be sent or not
Ray Milkey2476cac2014-04-08 11:03:21 -0700370 long currentTime = System.currentTimeMillis();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700371 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800372
Ray Milkey2476cac2014-04-08 11:03:21 -0700373 if (queue.isSendable(currentTime)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700374 int i = 0;
375 while (queue.hasMessageToSend()) {
376 // Number of messages excess the limit
Ray Milkey9526d6f2014-04-10 14:54:15 -0700377 if (0 < maxMsg && maxMsg <= i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700378 break;
379 }
380 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800381
Ray Milkey8e5170e2014-04-02 12:09:55 -0700382 SwitchQueueEntry queueEntry;
383 synchronized (queue) {
384 queueEntry = queue.poll();
385 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800386
Ray Milkey8e5170e2014-04-02 12:09:55 -0700387 OFMessage msg = queueEntry.getOFMessage();
388 try {
389 messageDamper.write(sw, msg, context);
390 if (log.isTraceEnabled()) {
391 log.trace("Pusher sends message : {}", msg);
392 }
393 size += msg.getLength();
394 } catch (IOException e) {
395 e.printStackTrace();
396 log.error("Exception in sending message ({}) : {}", msg, e);
397 }
398 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800399
Ray Milkey8e5170e2014-04-02 12:09:55 -0700400 sw.flush();
Ray Milkey2476cac2014-04-08 11:03:21 -0700401 queue.logSentData(currentTime, size);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700402 }
403 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800404
Ray Milkey8e5170e2014-04-02 12:09:55 -0700405 private boolean queuesHasMessageToSend() {
406 for (SwitchQueue queue : assignedQueues.values()) {
407 if (queue.hasMessageToSend()) {
408 return true;
409 }
410 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700411
Ray Milkey8e5170e2014-04-02 12:09:55 -0700412 return false;
413 }
Naoki Shiota05334692014-03-18 16:06:36 -0700414
Ray Milkey8e5170e2014-04-02 12:09:55 -0700415 private void notifyMessagePushed() {
416 queuingLock.lock();
417 try {
418 messagePushed.signal();
419 } finally {
420 queuingLock.unlock();
421 }
422 }
423 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700424
Ray Milkey8e5170e2014-04-02 12:09:55 -0700425 /**
426 * Initialize object with one thread.
427 */
428 public FlowPusher() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700429 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700430 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800431
Ray Milkey8e5170e2014-04-02 12:09:55 -0700432 /**
433 * Initialize object with threads of given number.
434 *
Ray Milkey9526d6f2014-04-10 14:54:15 -0700435 * @param numberThreadValue Number of threads to handle messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700436 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700437 public FlowPusher(int numberThreadValue) {
438 if (numberThreadValue > 0) {
439 numberThread = numberThreadValue;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700440 } else {
Ray Milkey9526d6f2014-04-10 14:54:15 -0700441 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700442 }
443 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800444
Ray Milkey8e5170e2014-04-02 12:09:55 -0700445 /**
446 * Set parameters needed for sending messages.
447 *
Ray Milkey5df613b2014-04-15 10:50:56 -0700448 * @param floodlightContext FloodlightContext used for sending messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700449 * If null, FlowPusher uses default context.
450 * @param modContext FloodlightModuleContext used for acquiring
451 * ThreadPoolService and registering MessageListener.
Ray Milkey5df613b2014-04-15 10:50:56 -0700452 * @param basicFactory Factory object to create OFMessage objects.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700453 * @param damper Message damper used for sending messages.
454 * If null, FlowPusher creates its own damper object.
455 */
Ray Milkey5df613b2014-04-15 10:50:56 -0700456 public void init(FloodlightContext floodlightContext,
Ray Milkey8e5170e2014-04-02 12:09:55 -0700457 FloodlightModuleContext modContext,
Ray Milkey5df613b2014-04-15 10:50:56 -0700458 BasicFactory basicFactory,
Ray Milkey8e5170e2014-04-02 12:09:55 -0700459 OFMessageDamper damper) {
Ray Milkey5df613b2014-04-15 10:50:56 -0700460 context = floodlightContext;
461 factory = basicFactory;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700462 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
463 IFloodlightProviderService flservice
464 = modContext.getServiceImpl(IFloodlightProviderService.class);
465 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800466
Ray Milkey8e5170e2014-04-02 12:09:55 -0700467 if (damper != null) {
468 messageDamper = damper;
469 } else {
470 // use default values
471 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
472 EnumSet.of(OFType.FLOW_MOD),
473 OFMESSAGE_DAMPER_TIMEOUT);
474 }
475 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800476
Ray Milkey8e5170e2014-04-02 12:09:55 -0700477 /**
478 * Begin processing queue.
479 */
480 public void start() {
481 if (factory == null) {
482 log.error("FlowPusher not yet initialized.");
483 return;
484 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800485
Ray Milkey8e5170e2014-04-02 12:09:55 -0700486 threadMap = new HashMap<Long, FlowPusherThread>();
Ray Milkey2476cac2014-04-08 11:03:21 -0700487 for (long i = 0; i < numberThread; ++i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700488 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800489
Ray Milkey8e5170e2014-04-02 12:09:55 -0700490 threadMap.put(i, thread);
491 thread.start();
492 }
493 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800494
Ray Milkey8e5170e2014-04-02 12:09:55 -0700495 @Override
496 public boolean suspend(IOFSwitch sw) {
497 SwitchQueue queue = getQueue(sw);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700498
Ray Milkey8e5170e2014-04-02 12:09:55 -0700499 if (queue == null) {
500 // create queue in case suspend is called before first message addition
501 queue = createQueueImpl(sw);
502 }
503
504 synchronized (queue) {
505 if (queue.state == QueueState.READY) {
506 queue.state = QueueState.SUSPENDED;
507 return true;
508 }
509 return false;
510 }
511 }
512
513 @Override
514 public boolean resume(IOFSwitch sw) {
515 SwitchQueue queue = getQueue(sw);
516
517 if (queue == null) {
518 log.error("No queue is attached to DPID : {}", sw.getId());
519 return false;
520 }
521
522 synchronized (queue) {
523 if (queue.state == QueueState.SUSPENDED) {
524 queue.state = QueueState.READY;
525
526 // Free the latch if queue has any messages
527 FlowPusherThread thread = getProcessingThread(sw);
528 if (queue.hasMessageToSend()) {
529 thread.notifyMessagePushed();
530 }
531 return true;
532 }
533 return false;
534 }
535 }
536
537 @Override
538 public QueueState getState(IOFSwitch sw) {
539 SwitchQueue queue = getQueue(sw);
540
541 if (queue == null) {
542 return QueueState.UNKNOWN;
543 }
544
545 return queue.state;
546 }
547
548 /**
549 * Stop processing queue and exit thread.
550 */
551 public void stop() {
552 if (threadMap == null) {
553 return;
554 }
555
556 for (FlowPusherThread t : threadMap.values()) {
557 t.interrupt();
558 }
559 }
560
561 @Override
562 public void setRate(IOFSwitch sw, long rate) {
563 SwitchQueue queue = getQueue(sw);
564 if (queue == null) {
565 queue = createQueueImpl(sw);
566 }
567
568 if (rate > 0) {
569 log.debug("rate for {} is set to {}", sw.getId(), rate);
570 synchronized (queue) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700571 queue.maxRate = rate;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700572 }
573 }
574 }
575
576 @Override
Pavlin Radoslavov9fc535a2014-04-11 13:00:12 -0700577 @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
578 justification = "Future versions of createQueueImpl() might return null")
Ray Milkey8e5170e2014-04-02 12:09:55 -0700579 public boolean createQueue(IOFSwitch sw) {
580 SwitchQueue queue = createQueueImpl(sw);
581
582 return (queue != null);
583 }
584
585 protected SwitchQueue createQueueImpl(IOFSwitch sw) {
586 SwitchQueue queue = getQueue(sw);
587 if (queue != null) {
588 return queue;
589 }
590
591 FlowPusherThread proc = getProcessingThread(sw);
592 queue = new SwitchQueue();
593 queue.state = QueueState.READY;
594 proc.assignedQueues.put(sw, queue);
595
596 return queue;
597 }
598
599 @Override
600 public boolean deleteQueue(IOFSwitch sw) {
601 return deleteQueue(sw, false);
602 }
603
604 @Override
605 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
606 FlowPusherThread proc = getProcessingThread(sw);
607
608 if (forceStop) {
609 SwitchQueue queue = proc.assignedQueues.remove(sw);
610 if (queue == null) {
611 return false;
612 }
613 return true;
614 } else {
615 SwitchQueue queue = getQueue(sw);
616 if (queue == null) {
617 return false;
618 }
619 synchronized (queue) {
620 queue.toBeDeleted = true;
621 }
622 return true;
623 }
624 }
625
626 @Override
627 public boolean add(IOFSwitch sw, OFMessage msg) {
628 return add(sw, msg, MsgPriority.NORMAL);
629 }
630
631 @Override
632 public boolean add(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
633 return addMessageImpl(sw, msg, priority);
634 }
635
636 @Override
637 public void pushFlowEntries(
638 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
639 pushFlowEntries(entries, MsgPriority.NORMAL);
640 }
641
642 @Override
643 public void pushFlowEntries(
644 Collection<Pair<IOFSwitch, FlowEntry>> entries, MsgPriority priority) {
645
646 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Sho SHIMIZU26d77892014-06-10 11:07:06 -0700647 add(entry.getFirst(), entry.getSecond(), priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700648 }
649 }
650
651 @Override
652 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
653 pushFlowEntry(sw, flowEntry, MsgPriority.NORMAL);
654 }
655
656 @Override
657 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
658 Collection<Pair<IOFSwitch, FlowEntry>> entries =
659 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
660
661 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
662 pushFlowEntries(entries, priority);
663 }
664
665 /**
Ray Milkeyddcd4922014-04-17 11:21:20 -0700666 * Fetch the match conditions.
667 * NOTE: The Flow matching conditions common for all Flow Entries are
668 * used ONLY if a Flow Entry does NOT have the corresponding matching
669 * condition set.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700670 *
Ray Milkeyddcd4922014-04-17 11:21:20 -0700671 * @param flowEntryMatch Flow entry to create a matcher for
672 * @return open flow matcher for the given values
Ray Milkey8e5170e2014-04-02 12:09:55 -0700673 */
Ray Milkeyddcd4922014-04-17 11:21:20 -0700674 private OFMatch computeMatch(FlowEntryMatch flowEntryMatch) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700675 OFMatch match = new OFMatch();
676 match.setWildcards(OFMatch.OFPFW_ALL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700677
678 // Match the Incoming Port
679 Port matchInPort = flowEntryMatch.inPort();
680 if (matchInPort != null) {
681 match.setInputPort(matchInPort.value());
682 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
683 }
684
685 // Match the Source MAC address
686 MACAddress matchSrcMac = flowEntryMatch.srcMac();
687 if (matchSrcMac != null) {
688 match.setDataLayerSource(matchSrcMac.toString());
689 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
690 }
691
692 // Match the Destination MAC address
693 MACAddress matchDstMac = flowEntryMatch.dstMac();
694 if (matchDstMac != null) {
695 match.setDataLayerDestination(matchDstMac.toString());
696 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
697 }
698
699 // Match the Ethernet Frame Type
700 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
701 if (matchEthernetFrameType != null) {
702 match.setDataLayerType(matchEthernetFrameType);
703 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
704 }
705
706 // Match the VLAN ID
707 Short matchVlanId = flowEntryMatch.vlanId();
708 if (matchVlanId != null) {
709 match.setDataLayerVirtualLan(matchVlanId);
710 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
711 }
712
713 // Match the VLAN priority
714 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
715 if (matchVlanPriority != null) {
716 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
717 match.setWildcards(match.getWildcards()
718 & ~OFMatch.OFPFW_DL_VLAN_PCP);
719 }
720
721 // Match the Source IPv4 Network prefix
722 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
723 if (matchSrcIPv4Net != null) {
724 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
725 }
726
727 // Natch the Destination IPv4 Network prefix
728 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
729 if (matchDstIPv4Net != null) {
730 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
731 }
732
733 // Match the IP protocol
734 Byte matchIpProto = flowEntryMatch.ipProto();
735 if (matchIpProto != null) {
736 match.setNetworkProtocol(matchIpProto);
737 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
738 }
739
740 // Match the IP ToS (DSCP field, 6 bits)
741 Byte matchIpToS = flowEntryMatch.ipToS();
742 if (matchIpToS != null) {
743 match.setNetworkTypeOfService(matchIpToS);
744 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
745 }
746
747 // Match the Source TCP/UDP port
748 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
749 if (matchSrcTcpUdpPort != null) {
750 match.setTransportSource(matchSrcTcpUdpPort);
751 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
752 }
753
754 // Match the Destination TCP/UDP port
755 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
756 if (matchDstTcpUdpPort != null) {
757 match.setTransportDestination(matchDstTcpUdpPort);
758 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
759 }
760
Ray Milkeyddcd4922014-04-17 11:21:20 -0700761 return match;
762 }
763
764
765 /**
766 * Wrapper object to hold a port number. Used to pass around output ports.
767 */
768 private static class OutputPort {
769 private Short portNumber;
770 }
771
772 /**
773 * Process a flow action entry, putting the resulting flow
774 * actions into a list. Will also set the actionOutputPort
775 * if one is encountered while processing an action.
776 *
777 * @param action Flow Entry Action to process
778 * @param openFlowActions actions to perform get added to this list
779 * @param actionOutputPort this will get set if an action output
780 * port is found
781 */
782 private void processAction(final FlowEntryAction action,
783 final List<OFAction> openFlowActions,
784 final OutputPort actionOutputPort) {
785 ActionOutput actionOutput = action.actionOutput();
786 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
787 ActionSetVlanPriority actionSetVlanPriority = action
788 .actionSetVlanPriority();
789 ActionStripVlan actionStripVlan = action.actionStripVlan();
790 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
791 .actionSetEthernetSrcAddr();
792 ActionSetEthernetAddr actionSetEthernetDstAddr = action
793 .actionSetEthernetDstAddr();
794 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
795 .actionSetIPv4SrcAddr();
796 ActionSetIPv4Addr actionSetIPv4DstAddr = action
797 .actionSetIPv4DstAddr();
798 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
799 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
800 .actionSetTcpUdpSrcPort();
801 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
802 .actionSetTcpUdpDstPort();
803 ActionEnqueue actionEnqueue = action.actionEnqueue();
804
805 if (actionOutput != null) {
806 actionOutputPort.portNumber = actionOutput.port().value();
807 // XXX: The max length is hard-coded for now
808 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
809 .value(), (short) 0xffff);
810 openFlowActions.add(ofa);
811 }
812
813 if (actionSetVlanId != null) {
814 OFActionVirtualLanIdentifier ofa =
815 new OFActionVirtualLanIdentifier(actionSetVlanId.vlanId());
816 openFlowActions.add(ofa);
817 }
818
819 if (actionSetVlanPriority != null) {
820 OFActionVirtualLanPriorityCodePoint ofa =
821 new OFActionVirtualLanPriorityCodePoint(actionSetVlanPriority.vlanPriority());
822 openFlowActions.add(ofa);
823 }
824
825 if (actionStripVlan != null) {
826 if (actionStripVlan.stripVlan()) {
827 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
828 openFlowActions.add(ofa);
829 }
830 }
831
832 if (actionSetEthernetSrcAddr != null) {
833 OFActionDataLayerSource ofa =
834 new OFActionDataLayerSource(actionSetEthernetSrcAddr.addr().toBytes());
835 openFlowActions.add(ofa);
836 }
837
838 if (actionSetEthernetDstAddr != null) {
839 OFActionDataLayerDestination ofa =
840 new OFActionDataLayerDestination(actionSetEthernetDstAddr.addr().toBytes());
841 openFlowActions.add(ofa);
842 }
843
844 if (actionSetIPv4SrcAddr != null) {
845 OFActionNetworkLayerSource ofa =
846 new OFActionNetworkLayerSource(actionSetIPv4SrcAddr.addr().value());
847 openFlowActions.add(ofa);
848 }
849
850 if (actionSetIPv4DstAddr != null) {
851 OFActionNetworkLayerDestination ofa =
852 new OFActionNetworkLayerDestination(actionSetIPv4DstAddr.addr().value());
853 openFlowActions.add(ofa);
854 }
855
856 if (actionSetIpToS != null) {
857 OFActionNetworkTypeOfService ofa =
858 new OFActionNetworkTypeOfService(actionSetIpToS.ipToS());
859 openFlowActions.add(ofa);
860 }
861
862 if (actionSetTcpUdpSrcPort != null) {
863 OFActionTransportLayerSource ofa =
864 new OFActionTransportLayerSource(actionSetTcpUdpSrcPort.port());
865 openFlowActions.add(ofa);
866 }
867
868 if (actionSetTcpUdpDstPort != null) {
869 OFActionTransportLayerDestination ofa =
870 new OFActionTransportLayerDestination(actionSetTcpUdpDstPort.port());
871 openFlowActions.add(ofa);
872 }
873
874 if (actionEnqueue != null) {
875 OFActionEnqueue ofa =
876 new OFActionEnqueue(actionEnqueue.port().value(), actionEnqueue.queueId());
877 openFlowActions.add(ofa);
878 }
879 }
880
881
882 /**
883 * Create a message from FlowEntry and add it to the queue of the switch.
884 *
885 * @param sw Switch to which message is pushed.
886 * @param flowEntry FlowEntry object used for creating message.
887 * @return true if message is successfully added to a queue.
888 */
889 private boolean add(IOFSwitch sw, FlowEntry flowEntry, MsgPriority priority) {
890 //
891 // Create the OpenFlow Flow Modification Entry to push
892 //
893 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
894 long cookie = flowEntry.flowEntryId().value();
895
896 short flowModCommand = OFFlowMod.OFPFC_ADD;
897 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
898 flowModCommand = OFFlowMod.OFPFC_ADD;
899 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
900 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
901 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
902 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
903 } else {
904 // Unknown user state. Ignore the entry
905 log.debug(
906 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
907 flowEntry.flowEntryId(),
908 flowEntry.flowEntryUserState());
909 return false;
910 }
911
912 final FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
913 final OFMatch match = computeMatch(flowEntryMatch);
914
915 final Port matchInPort = flowEntryMatch.inPort();
916 final MACAddress matchSrcMac = flowEntryMatch.srcMac();
917 final MACAddress matchDstMac = flowEntryMatch.dstMac();
918
Ray Milkey8e5170e2014-04-02 12:09:55 -0700919 //
920 // Fetch the actions
921 //
Ray Milkeyddcd4922014-04-17 11:21:20 -0700922 final List<OFAction> openFlowActions = new ArrayList<OFAction>();
923 final FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700924 //
Ray Milkeyddcd4922014-04-17 11:21:20 -0700925 final OutputPort actionOutputPort = new OutputPort();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700926 for (FlowEntryAction action : flowEntryActions.actions()) {
Ray Milkeyddcd4922014-04-17 11:21:20 -0700927 processAction(action, openFlowActions, actionOutputPort);
928 }
929 int actionsLen = 0;
930 for (OFAction ofa : openFlowActions) {
931 actionsLen += ofa.getLength();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700932 }
933
934 fm.setIdleTimeout((short) flowEntry.idleTimeout())
935 .setHardTimeout((short) flowEntry.hardTimeout())
936 .setPriority((short) flowEntry.priority())
937 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
938 .setCommand(flowModCommand).setMatch(match)
939 .setActions(openFlowActions)
940 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
941 fm.setOutPort(OFPort.OFPP_NONE.getValue());
942 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
943 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
Ray Milkeyddcd4922014-04-17 11:21:20 -0700944 if (actionOutputPort.portNumber != null) {
945 fm.setOutPort(actionOutputPort.portNumber);
Ray Milkeyb29e6262014-04-09 16:02:14 -0700946 }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700947 }
948
949 //
950 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
951 // permanent.
952 //
953 if ((flowEntry.idleTimeout() != 0) ||
954 (flowEntry.hardTimeout() != 0)) {
955 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
956 }
957
958 if (log.isTraceEnabled()) {
Jonathan Hartc00f5c22014-06-10 15:14:40 -0700959 log.trace("Installing flow entry {} into switch DPID: {} " +
960 "flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
Ray Milkey8e5170e2014-04-02 12:09:55 -0700961 , flowEntry.flowEntryUserState()
962 , sw.getStringId()
963 , flowEntry.flowEntryId()
964 , matchSrcMac
965 , matchDstMac
966 , matchInPort
967 , actionOutputPort
968 );
969 }
970
971 return addMessageImpl(sw, fm, priority);
972 }
973
974 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700975 * Add message to queue.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700976 *
977 * @param sw
978 * @param msg
Sho SHIMIZUa1199fa2014-06-10 18:11:12 -0700979 * @param priority
Ray Milkey8e5170e2014-04-02 12:09:55 -0700980 * @return
981 */
982 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg, MsgPriority priority) {
983 FlowPusherThread thread = getProcessingThread(sw);
984
985 SwitchQueue queue = getQueue(sw);
986
987 // create queue at first addition of message
988 if (queue == null) {
989 queue = createQueueImpl(sw);
990 }
991
992 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
993
994 synchronized (queue) {
995 queue.add(entry, priority);
996 if (log.isTraceEnabled()) {
997 log.trace("Message is pushed : {}", entry.getOFMessage());
998 }
999 }
1000
1001 thread.notifyMessagePushed();
1002
1003 return true;
1004 }
1005
1006 @Override
1007 public OFBarrierReply barrier(IOFSwitch sw) {
1008 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
1009 if (future == null) {
1010 return null;
1011 }
1012
1013 try {
1014 return future.get();
1015 } catch (InterruptedException e) {
1016 e.printStackTrace();
1017 log.error("InterruptedException: {}", e);
1018 return null;
1019 } catch (ExecutionException e) {
1020 e.printStackTrace();
1021 log.error("ExecutionException: {}", e);
1022 return null;
1023 }
1024 }
1025
1026 @Override
1027 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
1028 // TODO creation of message and future should be moved to OFSwitchImpl
1029
1030 if (sw == null) {
1031 return null;
1032 }
1033
1034 OFBarrierRequest msg = createBarrierRequest(sw);
1035
1036 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
1037 barrierFutures.put(BarrierInfo.create(sw, msg), future);
1038
1039 addMessageImpl(sw, msg, MsgPriority.NORMAL);
1040
1041 return future;
1042 }
1043
1044 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
1045 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
1046 msg.setXid(sw.getNextTransactionId());
1047
1048 return msg;
1049 }
1050
1051 /**
1052 * Get a queue attached to a switch.
1053 *
1054 * @param sw Switch object
1055 * @return Queue object
1056 */
1057 protected SwitchQueue getQueue(IOFSwitch sw) {
1058 if (sw == null) {
1059 return null;
1060 }
1061
1062 FlowPusherThread th = getProcessingThread(sw);
1063 if (th == null) {
1064 return null;
1065 }
1066
1067 return th.assignedQueues.get(sw);
1068 }
1069
1070 /**
1071 * Get a hash value correspondent to a switch.
1072 *
1073 * @param sw Switch object
1074 * @return Hash value
1075 */
1076 protected long getHash(IOFSwitch sw) {
1077 // This code assumes DPID is sequentially assigned.
1078 // TODO consider equalization algorithm
Ray Milkey2476cac2014-04-08 11:03:21 -07001079 return sw.getId() % numberThread;
Ray Milkey8e5170e2014-04-02 12:09:55 -07001080 }
1081
1082 /**
1083 * Get a Thread object which processes the queue attached to a switch.
1084 *
1085 * @param sw Switch object
1086 * @return Thread object
1087 */
1088 protected FlowPusherThread getProcessingThread(IOFSwitch sw) {
1089 long hash = getHash(sw);
1090
1091 return threadMap.get(hash);
1092 }
1093
1094 @Override
1095 public String getName() {
1096 return "flowpusher";
1097 }
1098
1099 @Override
1100 public boolean isCallbackOrderingPrereq(OFType type, String name) {
1101 return false;
1102 }
1103
1104 @Override
1105 public boolean isCallbackOrderingPostreq(OFType type, String name) {
1106 return false;
1107 }
1108
1109 @Override
1110 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
1111 if (log.isTraceEnabled()) {
1112 log.trace("Received BARRIER_REPLY from : {}", sw.getId());
1113 }
1114
Pavlin Radoslavov0b88a262014-04-10 15:43:27 -07001115 if ((msg.getType() != OFType.BARRIER_REPLY) ||
1116 !(msg instanceof OFBarrierReply)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -07001117 log.error("Unexpected reply message : {}", msg.getType());
1118 return Command.CONTINUE;
1119 }
1120
1121 OFBarrierReply reply = (OFBarrierReply) msg;
1122 BarrierInfo info = BarrierInfo.create(sw, reply);
1123
1124 // Deliver future if exists
1125 OFBarrierReplyFuture future = barrierFutures.get(info);
1126 if (future != null) {
1127 future.deliverFuture(sw, msg);
1128 barrierFutures.remove(info);
1129 }
1130
1131 return Command.CONTINUE;
1132 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001133}