blob: 824ed4f1a538c48fa2fb9b237860e00f44ec547c [file] [log] [blame]
Jonathan Hart23701d12014-04-03 10:45:48 -07001package net.onrc.onos.core.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007import java.util.HashMap;
Naoki Shiota47993102014-07-09 14:00:54 -07008import java.util.Iterator;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08009import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080010import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070011import java.util.Map;
Naoki Shiota47993102014-07-09 14:00:54 -070012import java.util.Map.Entry;
Naoki Shiota8df97bc2014-03-13 18:42:23 -070013import java.util.Queue;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070014import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080015import java.util.concurrent.ExecutionException;
Naoki Shiota05334692014-03-18 16:06:36 -070016import java.util.concurrent.locks.Condition;
17import java.util.concurrent.locks.Lock;
18import java.util.concurrent.locks.ReentrantLock;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070019
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070020import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080021import net.floodlightcontroller.core.IFloodlightProviderService;
22import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080024import net.floodlightcontroller.core.internal.OFMessageFuture;
25import net.floodlightcontroller.core.module.FloodlightModuleContext;
26import net.floodlightcontroller.threadpool.IThreadPoolService;
Brian O'Connoraa5a7b92014-08-29 14:45:18 -070027import net.floodlightcontroller.util.MACAddress;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070028import net.onrc.onos.core.intent.FlowEntry;
Brian O'Connoraa5a7b92014-08-29 14:45:18 -070029import net.onrc.onos.core.matchaction.MatchAction;
30import net.onrc.onos.core.matchaction.MatchActionOperationEntry;
31import net.onrc.onos.core.matchaction.MatchActionOperations.Operator;
32import net.onrc.onos.core.matchaction.action.Action;
33import net.onrc.onos.core.matchaction.action.ModifyDstMacAction;
34import net.onrc.onos.core.matchaction.action.ModifySrcMacAction;
35import net.onrc.onos.core.matchaction.action.OutputAction;
36import net.onrc.onos.core.matchaction.match.Match;
37import net.onrc.onos.core.matchaction.match.PacketMatch;
Jonathan Hart5302b6c2014-08-13 15:57:59 -070038import net.onrc.onos.core.util.Dpid;
Brian O'Connoraa5a7b92014-08-29 14:45:18 -070039import net.onrc.onos.core.util.IPv4Net;
40import net.onrc.onos.core.util.SwitchPort;
Jonathan Harta99ec672014-04-03 11:30:34 -070041
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -070042import org.apache.commons.lang3.tuple.Pair;
Brian O'Connoraa5a7b92014-08-29 14:45:18 -070043import org.projectfloodlight.openflow.protocol.OFActionType;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070044import org.projectfloodlight.openflow.protocol.OFBarrierReply;
45import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
46import org.projectfloodlight.openflow.protocol.OFFactory;
47import org.projectfloodlight.openflow.protocol.OFFlowMod;
48import org.projectfloodlight.openflow.protocol.OFMessage;
49import org.projectfloodlight.openflow.protocol.OFType;
Brian O'Connoraa5a7b92014-08-29 14:45:18 -070050import org.projectfloodlight.openflow.protocol.action.OFAction;
51import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
52import org.projectfloodlight.openflow.protocol.action.OFActions;
53import org.projectfloodlight.openflow.protocol.match.Match.Builder;
54import org.projectfloodlight.openflow.protocol.match.MatchField;
55import org.projectfloodlight.openflow.types.EthType;
56import org.projectfloodlight.openflow.types.IPv4Address;
57import org.projectfloodlight.openflow.types.IpProtocol;
58import org.projectfloodlight.openflow.types.MacAddress;
59import org.projectfloodlight.openflow.types.OFBufferId;
60import org.projectfloodlight.openflow.types.OFPort;
61import org.projectfloodlight.openflow.types.TransportPort;
62import org.projectfloodlight.openflow.types.U64;
Jonathan Harta99ec672014-04-03 11:30:34 -070063import org.slf4j.Logger;
64import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070065
Jonathan Hartc00f5c22014-06-10 15:14:40 -070066import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
67
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070068/**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -070069 * FlowPusher is a implementation of FlowPusherService. FlowPusher assigns one
70 * message queue instance for each one switch. Number of message processing
71 * threads is configurable by constructor, and one thread can handle multiple
72 * message queues. Each queue will be assigned to a thread according to hash
73 * function defined by getHash(). Each processing thread reads messages from
74 * queues and sends it to switches in round-robin. Processing thread also
75 * calculates rate of sending to suppress excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070076 */
Ray Milkey1584ec82014-04-10 11:58:30 -070077public final class FlowPusher implements IFlowPusherService, IOFMessageListener {
Ray Milkeyec838942014-04-09 11:28:43 -070078 private static final Logger log = LoggerFactory.getLogger(FlowPusher.class);
Jonathan Hart5302b6c2014-08-13 15:57:59 -070079 private static final int DEFAULT_NUMBER_THREAD = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080080
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080081 // Number of messages sent to switch at once
82 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080083
Jonathan Hart5302b6c2014-08-13 15:57:59 -070084 private FloodlightModuleContext context = null;
85 private IThreadPoolService threadPool = null;
86 private IFloodlightProviderService floodlightProvider = null;
87
88 // Map of threads versus dpid
89 private Map<Long, FlowPusherThread> threadMap = null;
90 // Map from (DPID and transaction ID) to Future objects.
91 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures =
92 new ConcurrentHashMap<BarrierInfo, OFBarrierReplyFuture>();
93
94 private int numberThread;
95
Ray Milkey8e5170e2014-04-02 12:09:55 -070096 private static class SwitchQueueEntry {
97 OFMessage msg;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070098
Ray Milkey8e5170e2014-04-02 12:09:55 -070099 public SwitchQueueEntry(OFMessage msg) {
100 this.msg = msg;
101 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700102
Ray Milkey8e5170e2014-04-02 12:09:55 -0700103 public OFMessage getOFMessage() {
104 return msg;
105 }
106 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700107
Ray Milkey8e5170e2014-04-02 12:09:55 -0700108 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700109 * SwitchQueue represents message queue attached to a switch. This consists
110 * of queue itself and variables used for limiting sending rate.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700111 */
Pavlin Radoslavovfee80982014-04-10 12:12:04 -0700112 private static class SwitchQueue {
Ray Milkey2476cac2014-04-08 11:03:21 -0700113 List<Queue<SwitchQueueEntry>> rawQueues;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700114 QueueState state;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800115
Ray Milkey8e5170e2014-04-02 12:09:55 -0700116 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700117 long maxRate = 0; // 0 indicates no limitation
Ray Milkey2476cac2014-04-08 11:03:21 -0700118 long lastSentTime = 0;
119 long lastSentSize = 0;
Naoki Shiota05334692014-03-18 16:06:36 -0700120
Ray Milkey8e5170e2014-04-02 12:09:55 -0700121 // "To be deleted" flag
122 boolean toBeDeleted = false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800123
Ray Milkey8e5170e2014-04-02 12:09:55 -0700124 SwitchQueue() {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -0700125 rawQueues = new ArrayList<>(MsgPriority.values().length);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700126 for (int i = 0; i < MsgPriority.values().length; ++i) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700127 rawQueues.add(i, new ArrayDeque<SwitchQueueEntry>());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700128 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800129
Ray Milkey8e5170e2014-04-02 12:09:55 -0700130 state = QueueState.READY;
131 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800132
Ray Milkey8e5170e2014-04-02 12:09:55 -0700133 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700134 * Check if sending rate is within the rate.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700135 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700136 * @param current Current time
137 * @return true if within the rate
138 */
139 boolean isSendable(long current) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700140 if (maxRate == 0) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700141 // no limitation
142 return true;
143 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800144
Ray Milkey2476cac2014-04-08 11:03:21 -0700145 if (current == lastSentTime) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700146 return false;
147 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800148
Ray Milkey8e5170e2014-04-02 12:09:55 -0700149 // Check if sufficient time (from aspect of rate) elapsed or not.
Ray Milkey2476cac2014-04-08 11:03:21 -0700150 long rate = lastSentSize / (current - lastSentTime);
151 return (rate < maxRate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700152 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800153
Ray Milkey8e5170e2014-04-02 12:09:55 -0700154 /**
155 * Log time and size of last sent data.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700156 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700157 * @param current Time to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700158 * @param size Size of sent data (in bytes).
Ray Milkey8e5170e2014-04-02 12:09:55 -0700159 */
160 void logSentData(long current, long size) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700161 lastSentTime = current;
162 lastSentSize = size;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700163 }
Naoki Shiota8df97bc2014-03-13 18:42:23 -0700164
Ray Milkey8e5170e2014-04-02 12:09:55 -0700165 boolean add(SwitchQueueEntry entry, MsgPriority priority) {
166 Queue<SwitchQueueEntry> queue = getQueue(priority);
167 if (queue == null) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700168 log.error("Unexpected priority: {}", priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700169 return false;
170 }
171 return queue.add(entry);
172 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800173
Ray Milkey8e5170e2014-04-02 12:09:55 -0700174 /**
175 * Poll single appropriate entry object according to QueueState.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700176 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700177 * @return Entry object.
178 */
179 SwitchQueueEntry poll() {
180 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700181 case READY: {
182 for (int i = 0; i < rawQueues.size(); ++i) {
183 SwitchQueueEntry entry = rawQueues.get(i).poll();
184 if (entry != null) {
185 return entry;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700186 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700187 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800188
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700189 return null;
190 }
191 case SUSPENDED: {
192 // Only polling from high priority queue
193 SwitchQueueEntry entry = getQueue(MsgPriority.HIGH).poll();
194 return entry;
195 }
196 default:
197 log.error("Unexpected QueueState: {}", state);
198 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700199 }
200 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800201
Ray Milkey8e5170e2014-04-02 12:09:55 -0700202 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700203 * Check if this object has any messages in the queues to be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700204 * <p>
Ray Milkey8e5170e2014-04-02 12:09:55 -0700205 * @return True if there are some messages to be sent.
206 */
207 boolean hasMessageToSend() {
208 switch (state) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700209 case READY:
210 for (Queue<SwitchQueueEntry> queue : rawQueues) {
211 if (!queue.isEmpty()) {
212 return true;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700213 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700214 }
215 break;
216 case SUSPENDED:
217 // Only checking high priority queue
218 return (!getQueue(MsgPriority.HIGH).isEmpty());
219 default:
220 log.error("Unexpected QueueState: {}", state);
221 return false;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700222 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800223
Ray Milkey8e5170e2014-04-02 12:09:55 -0700224 return false;
225 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800226
Ray Milkey8e5170e2014-04-02 12:09:55 -0700227 Queue<SwitchQueueEntry> getQueue(MsgPriority priority) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700228 return rawQueues.get(priority.ordinal());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700229 }
230 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800231
Ray Milkey8e5170e2014-04-02 12:09:55 -0700232 /**
233 * BarrierInfo holds information to specify barrier message sent to switch.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700234 */
Ray Milkey1584ec82014-04-10 11:58:30 -0700235 private static final class BarrierInfo {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700236 final long dpid;
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700237 final long xid;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800238
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700239 static BarrierInfo create(long dpid, OFBarrierRequest req) {
240 return new BarrierInfo(dpid, req.getXid());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700241 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800242
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700243 static BarrierInfo create(long dpid, OFBarrierReply rpy) {
244 return new BarrierInfo(dpid, rpy.getXid());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700245 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800246
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700247 private BarrierInfo(long dpid, long xid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700248 this.dpid = dpid;
249 this.xid = xid;
250 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800251
Ray Milkey8e5170e2014-04-02 12:09:55 -0700252 // Auto generated code by Eclipse
253 @Override
254 public int hashCode() {
255 final int prime = 31;
256 int result = 1;
257 result = prime * result + (int) (dpid ^ (dpid >>> 32));
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700258 result = prime * result + (int) (xid ^ (xid >>> 32));
Ray Milkey8e5170e2014-04-02 12:09:55 -0700259 return result;
260 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800261
Ray Milkey8e5170e2014-04-02 12:09:55 -0700262 @Override
263 public boolean equals(Object obj) {
Ray Milkeyb29e6262014-04-09 16:02:14 -0700264 if (this == obj) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700265 return true;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700266 }
267 if (obj == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700268 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700269 }
270 if (getClass() != obj.getClass()) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700271 return false;
Ray Milkeyb29e6262014-04-09 16:02:14 -0700272 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800273
Ray Milkey8e5170e2014-04-02 12:09:55 -0700274 BarrierInfo other = (BarrierInfo) obj;
275 return (this.dpid == other.dpid) && (this.xid == other.xid);
276 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800277
Ray Milkey8e5170e2014-04-02 12:09:55 -0700278 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800279
Ray Milkey8e5170e2014-04-02 12:09:55 -0700280 /**
281 * Main thread that reads messages from queues and sends them to switches.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700282 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700283 private class FlowPusherThread extends Thread {
284 private Map<Dpid, SwitchQueue> assignedQueues = new ConcurrentHashMap<>();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800285
Ray Milkey8e5170e2014-04-02 12:09:55 -0700286 final Lock queuingLock = new ReentrantLock();
287 final Condition messagePushed = queuingLock.newCondition();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800288
Ray Milkey8e5170e2014-04-02 12:09:55 -0700289 @Override
290 public void run() {
291 this.setName("FlowPusherThread " + this.getId());
292 while (true) {
293 while (!queuesHasMessageToSend()) {
294 queuingLock.lock();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800295
Ray Milkey8e5170e2014-04-02 12:09:55 -0700296 try {
297 // wait for message pushed to queue
298 messagePushed.await();
299 } catch (InterruptedException e) {
300 // Interrupted to be shut down (not an error)
301 log.debug("FlowPusherThread is interrupted");
302 return;
303 } finally {
304 queuingLock.unlock();
305 }
306 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800307
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700308 for (Iterator<Entry<Dpid, SwitchQueue>> it = assignedQueues
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700309 .entrySet().iterator(); it.hasNext();) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700310 Entry<Dpid, SwitchQueue> entry = it.next();
Naoki Shiota47993102014-07-09 14:00:54 -0700311 SwitchQueue queue = entry.getValue();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800312
Yuta HIGUCHI0fe749a2014-05-27 09:35:16 -0700313 if (queue == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700314 continue;
315 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800316
Ray Milkey8e5170e2014-04-02 12:09:55 -0700317 synchronized (queue) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700318 processQueue(entry.getKey(), queue, MAX_MESSAGE_SEND);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700319 if (queue.toBeDeleted && !queue.hasMessageToSend()) {
320 // remove queue if flagged to be.
Naoki Shiota47993102014-07-09 14:00:54 -0700321 it.remove();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700322 }
323 }
324 }
325 }
326 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800327
Ray Milkey8e5170e2014-04-02 12:09:55 -0700328 /**
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700329 * Read messages from queue and send them to the switch. If number of
330 * messages excess the limit, stop sending messages.
331 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700332 * @param dpid DPID of the switch to which messages will be sent.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700333 * @param queue Queue of messages.
334 * @param maxMsg Limitation of number of messages to be sent. If set to
335 * 0, all messages in queue will be sent.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700336 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700337 private void processQueue(Dpid dpid, SwitchQueue queue, int maxMsg) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700338 // check sending rate and determine it to be sent or not
Ray Milkey2476cac2014-04-08 11:03:21 -0700339 long currentTime = System.currentTimeMillis();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700340 long size = 0;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800341
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700342 IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
343 if (sw == null) {
344 // FlowPusher state for this switch will get cleaned up soon
345 // due to the switchDisconnected event
346 log.debug("Switch {} not found when processing queue", dpid);
347 return;
348 }
349
Naoki Shiota47993102014-07-09 14:00:54 -0700350 if (sw.isConnected() && queue.isSendable(currentTime)) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700351 int i = 0;
352 while (queue.hasMessageToSend()) {
353 // Number of messages excess the limit
Ray Milkey9526d6f2014-04-10 14:54:15 -0700354 if (0 < maxMsg && maxMsg <= i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700355 break;
356 }
357 ++i;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800358
Ray Milkey8e5170e2014-04-02 12:09:55 -0700359 SwitchQueueEntry queueEntry;
360 synchronized (queue) {
361 queueEntry = queue.poll();
362 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800363
Ray Milkey8e5170e2014-04-02 12:09:55 -0700364 OFMessage msg = queueEntry.getOFMessage();
365 try {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700366 sw.write(msg, null);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700367 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700368 log.trace("Pusher sends message to switch {}: {}", sw.getStringId(), msg);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700369 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700370 // TODO BOC how do we get the size?
371 // size += msg.getLength();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700372 } catch (IOException e) {
Yuta HIGUCHIe933a282014-07-13 21:02:29 -0700373 log.error("Exception in sending message (" + msg + "):", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700374 }
375 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800376
Ray Milkey8e5170e2014-04-02 12:09:55 -0700377 sw.flush();
Ray Milkey2476cac2014-04-08 11:03:21 -0700378 queue.logSentData(currentTime, size);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700379 }
380 }
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800381
Ray Milkey8e5170e2014-04-02 12:09:55 -0700382 private boolean queuesHasMessageToSend() {
383 for (SwitchQueue queue : assignedQueues.values()) {
384 if (queue.hasMessageToSend()) {
385 return true;
386 }
387 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700388
Ray Milkey8e5170e2014-04-02 12:09:55 -0700389 return false;
390 }
Naoki Shiota05334692014-03-18 16:06:36 -0700391
Ray Milkey8e5170e2014-04-02 12:09:55 -0700392 private void notifyMessagePushed() {
393 queuingLock.lock();
394 try {
395 messagePushed.signal();
396 } finally {
397 queuingLock.unlock();
398 }
399 }
400 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700401
Ray Milkey8e5170e2014-04-02 12:09:55 -0700402 /**
403 * Initialize object with one thread.
404 */
405 public FlowPusher() {
Ray Milkey2476cac2014-04-08 11:03:21 -0700406 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700407 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800408
Ray Milkey8e5170e2014-04-02 12:09:55 -0700409 /**
410 * Initialize object with threads of given number.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700411 * <p>
Ray Milkey9526d6f2014-04-10 14:54:15 -0700412 * @param numberThreadValue Number of threads to handle messages.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700413 */
Ray Milkey9526d6f2014-04-10 14:54:15 -0700414 public FlowPusher(int numberThreadValue) {
415 if (numberThreadValue > 0) {
416 numberThread = numberThreadValue;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700417 } else {
Ray Milkey9526d6f2014-04-10 14:54:15 -0700418 numberThread = DEFAULT_NUMBER_THREAD;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700419 }
420 }
Naoki Shiota81dbe302013-11-21 15:35:38 -0800421
Ray Milkey8e5170e2014-04-02 12:09:55 -0700422 /**
423 * Set parameters needed for sending messages.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700424 * <p>
425 * @param floodlightContext FloodlightModuleContext used for acquiring
426 * ThreadPoolService and registering MessageListener.
Ray Milkey8e5170e2014-04-02 12:09:55 -0700427 */
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700428 public void init(FloodlightModuleContext floodlightContext) {
429 this.context = floodlightContext;
430 this.floodlightProvider = context
431 .getServiceImpl(IFloodlightProviderService.class);
432 this.threadPool = context.getServiceImpl(IThreadPoolService.class);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800433
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700434 floodlightProvider.addOFMessageListener(OFType.BARRIER_REPLY, this);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700435 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800436
Ray Milkey8e5170e2014-04-02 12:09:55 -0700437 /**
438 * Begin processing queue.
439 */
440 public void start() {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -0700441 threadMap = new HashMap<>();
Ray Milkey2476cac2014-04-08 11:03:21 -0700442 for (long i = 0; i < numberThread; ++i) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700443 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotac1601d32013-11-20 10:47:34 -0800444
Ray Milkey8e5170e2014-04-02 12:09:55 -0700445 threadMap.put(i, thread);
446 thread.start();
447 }
448 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800449
Ray Milkey8e5170e2014-04-02 12:09:55 -0700450 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700451 public boolean suspend(Dpid dpid) {
452 SwitchQueue queue = getQueue(dpid);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700453
Ray Milkey8e5170e2014-04-02 12:09:55 -0700454 if (queue == null) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700455 // create queue in case suspend is called before first message
456 // addition
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700457 queue = createQueueImpl(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700458 }
459
460 synchronized (queue) {
461 if (queue.state == QueueState.READY) {
462 queue.state = QueueState.SUSPENDED;
463 return true;
464 }
465 return false;
466 }
467 }
468
469 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700470 public boolean resume(Dpid dpid) {
471 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700472
473 if (queue == null) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700474 log.error("No queue is attached to DPID: {}", dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700475 return false;
476 }
477
478 synchronized (queue) {
479 if (queue.state == QueueState.SUSPENDED) {
480 queue.state = QueueState.READY;
481
482 // Free the latch if queue has any messages
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700483 FlowPusherThread thread = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700484 if (queue.hasMessageToSend()) {
485 thread.notifyMessagePushed();
486 }
487 return true;
488 }
489 return false;
490 }
491 }
492
493 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700494 public QueueState getState(Dpid dpid) {
495 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700496
497 if (queue == null) {
498 return QueueState.UNKNOWN;
499 }
500
501 return queue.state;
502 }
503
504 /**
505 * Stop processing queue and exit thread.
506 */
507 public void stop() {
508 if (threadMap == null) {
509 return;
510 }
511
512 for (FlowPusherThread t : threadMap.values()) {
513 t.interrupt();
514 }
515 }
516
517 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700518 public void setRate(Dpid dpid, long rate) {
519 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700520 if (queue == null) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700521 queue = createQueueImpl(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700522 }
523
524 if (rate > 0) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700525 log.debug("rate for {} is set to {}", dpid, rate);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700526 synchronized (queue) {
Ray Milkey2476cac2014-04-08 11:03:21 -0700527 queue.maxRate = rate;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700528 }
529 }
530 }
531
532 @Override
Pavlin Radoslavov9fc535a2014-04-11 13:00:12 -0700533 @SuppressFBWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE",
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700534 justification = "Future versions of createQueueImpl() might return null")
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700535 public boolean createQueue(Dpid dpid) {
536 SwitchQueue queue = createQueueImpl(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700537
538 return (queue != null);
539 }
540
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700541 protected SwitchQueue createQueueImpl(Dpid dpid) {
542 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700543 if (queue != null) {
544 return queue;
545 }
546
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700547 FlowPusherThread proc = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700548 queue = new SwitchQueue();
549 queue.state = QueueState.READY;
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700550 proc.assignedQueues.put(dpid, queue);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700551
552 return queue;
553 }
554
555 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700556 public boolean deleteQueue(Dpid dpid) {
557 return deleteQueue(dpid, false);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700558 }
559
560 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700561 public boolean deleteQueue(Dpid dpid, boolean forceStop) {
562 FlowPusherThread proc = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700563
564 if (forceStop) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700565 SwitchQueue queue = proc.assignedQueues.remove(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700566 if (queue == null) {
567 return false;
568 }
569 return true;
570 } else {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700571 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700572 if (queue == null) {
573 return false;
574 }
575 synchronized (queue) {
576 queue.toBeDeleted = true;
577 }
578 return true;
579 }
580 }
581
582 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700583 public boolean add(Dpid dpid, OFMessage msg) {
584 return add(dpid, msg, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700585 }
586
587 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700588 public boolean add(Dpid dpid, OFMessage msg, MsgPriority priority) {
589 return addMessageImpl(dpid, msg, priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700590 }
591
592 @Override
593 public void pushFlowEntries(
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700594 Collection<Pair<Dpid, FlowEntry>> entries) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700595 pushFlowEntries(entries, MsgPriority.NORMAL);
596 }
597
598 @Override
599 public void pushFlowEntries(
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700600 Collection<Pair<Dpid, FlowEntry>> entries, MsgPriority priority) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700601
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700602 for (Pair<Dpid, FlowEntry> entry : entries) {
Pavlin Radoslavovb05aac92014-08-26 18:26:10 -0700603 add(entry.getLeft(), entry.getRight(), priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700604 }
605 }
606
607 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700608 public void pushFlowEntry(Dpid dpid, FlowEntry flowEntry) {
609 pushFlowEntry(dpid, flowEntry, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700610 }
611
612 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700613 public void pushFlowEntry(Dpid dpid, FlowEntry flowEntry, MsgPriority priority) {
614 Collection<Pair<Dpid, FlowEntry>> entries = new LinkedList<>();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700615
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700616 entries.add(Pair.of(dpid, flowEntry));
Ray Milkey8e5170e2014-04-02 12:09:55 -0700617 pushFlowEntries(entries, priority);
618 }
619
Brian O'Connoraa5a7b92014-08-29 14:45:18 -0700620 public static final int PRIORITY_DEFAULT = 32768; // Default Flow Priority
621
622 @Override
623 public void pushMatchAction(MatchActionOperationEntry matchActionOp) {
624 final MatchAction matchAction = matchActionOp.getTarget();
625
626 // Get the switch and its OFFactory
627 final SwitchPort srcPort = matchAction.getSwitchPort();
628 final Dpid dpid = srcPort.getDpid();
629 IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
630 if (sw == null) {
631 log.warn("Couldn't find switch {} when pushing message", dpid);
632 return;
633 }
634 OFFactory factory = sw.getFactory();
635
636 // Build Match
637 final Match match = matchAction.getMatch();
638 Builder matchBuilder = factory.buildMatch();
639 if (match instanceof PacketMatch) {
640 final PacketMatch packetMatch = (PacketMatch) match;
641 final MACAddress srcMac = packetMatch.getSrcMacAddress();
642 final MACAddress dstMac = packetMatch.getDstMacAddress();
643 final Short etherType = packetMatch.getEtherType();
644 final IPv4Net srcIp = packetMatch.getSrcIpAddress();
645 final IPv4Net dstIp = packetMatch.getDstIpAddress();
646 final Byte ipProto = packetMatch.getIpProtocolNumber();
647 final Short srcTcpPort = packetMatch.getSrcTcpPortNumber();
648 final Short dstTcpPort = packetMatch.getDstTcpPortNumber();
649
650 if (srcMac != null) {
651 matchBuilder.setExact(MatchField.ETH_SRC, MacAddress.of(srcMac.toLong()));
652 }
653 if (dstMac != null) {
654 matchBuilder.setExact(MatchField.ETH_DST, MacAddress.of(dstMac.toLong()));
655 }
656 if (etherType != null) {
657 matchBuilder.setExact(MatchField.ETH_TYPE, EthType.of(etherType));
658 }
659 if (srcIp != null) {
660 matchBuilder.setMasked(MatchField.IPV4_SRC,
661 IPv4Address.of(srcIp.address().value())
662 .withMaskOfLength(srcIp.prefixLen()));
663 }
664 if (dstIp != null) {
665 matchBuilder.setMasked(MatchField.IPV4_DST,
666 IPv4Address.of(dstIp.address().value())
667 .withMaskOfLength(dstIp.prefixLen()));
668 }
669 if (ipProto != null) {
670 matchBuilder.setExact(MatchField.IP_PROTO, IpProtocol.of(ipProto));
671 }
672 if (srcTcpPort != null) {
673 matchBuilder.setExact(MatchField.TCP_SRC, TransportPort.of(srcTcpPort));
674 }
675 if (dstTcpPort != null) {
676 matchBuilder.setExact(MatchField.TCP_DST, TransportPort.of(dstTcpPort));
677 }
678 matchBuilder.setExact(MatchField.IN_PORT,
679 OFPort.of(srcPort.getPortNumber().shortValue()));
680 } else {
681 log.warn("Unsupported Match type: {}", match.getClass().getName());
682 return;
683 }
684
685 // Build Actions
686 List<OFAction> actionList = new ArrayList<>(matchAction.getActions().size());
687 OFActions ofActionTypes = factory.actions();
688 for (Action action : matchAction.getActions()) {
689 OFAction ofAction = null;
690 if (action instanceof OutputAction) {
691 OutputAction outputAction = (OutputAction) action;
692 // short or int?
693 OFPort port = OFPort.of((int) outputAction.getPortNumber().value());
694 ofAction = ofActionTypes.output(port, Short.MAX_VALUE);
695 } else if (action instanceof ModifyDstMacAction) {
696 ModifyDstMacAction dstMacAction = (ModifyDstMacAction) action;
697 ofActionTypes.setDlDst(MacAddress.of(dstMacAction.getDstMac().toLong()));
698 } else if (action instanceof ModifySrcMacAction) {
699 ModifySrcMacAction srcMacAction = (ModifySrcMacAction) action;
700 ofActionTypes.setDlSrc(MacAddress.of(srcMacAction.getSrcMac().toLong()));
701 } else {
702 log.warn("Unsupported Action type: {}", action.getClass().getName());
703 continue;
704 }
705 actionList.add(ofAction);
706 }
707
708 // Construct a FlowMod message builder
709 OFFlowMod.Builder fmBuilder = null;
710 switch (matchActionOp.getOperator()) {
711 case ADD:
712 fmBuilder = factory.buildFlowAdd();
713 break;
714 case REMOVE:
715 fmBuilder = factory.buildFlowDeleteStrict();
716 break;
717 // case MODIFY: // TODO
718 // fmBuilder = factory.buildFlowModifyStrict();
719 // break;
720 default:
721 log.warn("Unsupported MatchAction Operator: {}", matchActionOp.getOperator());
722 return;
723 }
724
725 // Add output port for OF1.0
726 OFPort outp = OFPort.of((short) 0xffff); // OF1.0 OFPP.NONE
727 if (matchActionOp.getOperator() == Operator.REMOVE) {
728 if (actionList.size() == 1) {
729 if (actionList.get(0).getType() == OFActionType.OUTPUT) {
730 OFActionOutput oa = (OFActionOutput) actionList.get(0);
731 outp = oa.getPort();
732 }
733 }
734 }
735
736
737 // Build OFFlowMod
738 fmBuilder.setMatch(matchBuilder.build())
739 .setActions(actionList)
740 .setIdleTimeout(0) // hardcoded to zero for now
741 .setHardTimeout(0) // hardcoded to zero for now
742 .setCookie(U64.of(matchAction.getId().value()))
743 .setBufferId(OFBufferId.NO_BUFFER)
744 .setPriority(PRIORITY_DEFAULT)
745 .setOutPort(outp);
746
747 // Build the message and add it to the queue
748 add(dpid, fmBuilder.build());
749 }
750
751 @Override
752 public void pushMatchActions(Collection<MatchActionOperationEntry> matchActionOps) {
753 for (MatchActionOperationEntry matchActionOp : matchActionOps) {
754 pushMatchAction(matchActionOp);
755 }
756 }
757
Ray Milkey8e5170e2014-04-02 12:09:55 -0700758 /**
Ray Milkeyddcd4922014-04-17 11:21:20 -0700759 * Create a message from FlowEntry and add it to the queue of the switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700760 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700761 * @param dpid DPID of the switch to which the message is pushed.
Ray Milkeyddcd4922014-04-17 11:21:20 -0700762 * @param flowEntry FlowEntry object used for creating message.
763 * @return true if message is successfully added to a queue.
764 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700765 private boolean add(Dpid dpid, FlowEntry flowEntry, MsgPriority priority) {
766 IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
767 if (sw == null) {
768 log.warn("Couldn't find switch {} when pushing message", dpid);
769 return false;
770 }
771
Ray Milkeyddcd4922014-04-17 11:21:20 -0700772 //
773 // Create the OpenFlow Flow Modification Entry to push
774 //
Jonathan Harta213bce2014-08-11 15:44:07 -0700775 OFFlowMod fm = flowEntry.buildFlowMod(sw.getFactory());
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700776 // log.trace("Pushing flow mod {}", fm);
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700777 return addMessageImpl(dpid, fm, priority);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700778 }
779
780 /**
Ray Milkeyb41100a2014-04-10 10:42:15 -0700781 * Add message to queue.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700782 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700783 * @param dpid DPID of the switch to which the message is sent
784 * @param msg message to send to the switch
785 * @param priority priority of the message
Jonathan Hart99ff20a2014-06-15 16:53:00 -0700786 * @return true if the message was added successfully, otherwise false
Ray Milkey8e5170e2014-04-02 12:09:55 -0700787 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700788 protected boolean addMessageImpl(Dpid dpid, OFMessage msg, MsgPriority priority) {
789 FlowPusherThread thread = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700790
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700791 SwitchQueue queue = getQueue(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700792
793 // create queue at first addition of message
794 if (queue == null) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700795 queue = createQueueImpl(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700796 }
797
798 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
799
800 synchronized (queue) {
801 queue.add(entry, priority);
802 if (log.isTraceEnabled()) {
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700803 log.trace("Message is pushed to switch {}: {}",
804 dpid, entry.getOFMessage());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700805 }
806 }
807
808 thread.notifyMessagePushed();
809
810 return true;
811 }
812
813 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700814 public OFBarrierReply barrier(Dpid dpid) {
815 OFMessageFuture<OFBarrierReply> future = barrierAsync(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700816 if (future == null) {
817 return null;
818 }
Ray Milkey8e5170e2014-04-02 12:09:55 -0700819 try {
820 return future.get();
821 } catch (InterruptedException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700822 log.error("InterruptedException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700823 } catch (ExecutionException e) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700824 log.error("ExecutionException", e);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700825 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700826 return null;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700827 }
828
829 @Override
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700830 public OFMessageFuture<OFBarrierReply> barrierAsync(Dpid dpid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700831 // TODO creation of message and future should be moved to OFSwitchImpl
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700832 IOFSwitch sw = floodlightProvider.getMasterSwitch(dpid.value());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700833 if (sw == null) {
834 return null;
835 }
836
837 OFBarrierRequest msg = createBarrierRequest(sw);
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700838 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw,
839 (int) msg.getXid());
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700840 barrierFutures.put(BarrierInfo.create(dpid.value(), msg), future);
841 addMessageImpl(dpid, msg, MsgPriority.NORMAL);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700842 return future;
843 }
844
845 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
Jonathan Harta213bce2014-08-11 15:44:07 -0700846 OFFactory factory = sw.getFactory();
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700847 if (factory == null) {
848 log.error("No OF Message Factory for switch {} with OFVersion {}", sw,
849 sw.getOFVersion());
850 return null;
851 }
852 return factory.buildBarrierRequest()
853 .setXid(sw.getNextTransactionId())
854 .build();
Ray Milkey8e5170e2014-04-02 12:09:55 -0700855 }
856
857 /**
858 * Get a queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700859 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700860 * @param dpid DPID of the switch
Ray Milkey8e5170e2014-04-02 12:09:55 -0700861 * @return Queue object
862 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700863 protected SwitchQueue getQueue(Dpid dpid) {
864 if (dpid == null) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700865 return null;
866 }
867
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700868 FlowPusherThread th = getProcessingThread(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700869 if (th == null) {
870 return null;
871 }
872
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700873 return th.assignedQueues.get(dpid);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700874 }
875
876 /**
877 * Get a hash value correspondent to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700878 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700879 * @param dpid DPID of the switch
Ray Milkey8e5170e2014-04-02 12:09:55 -0700880 * @return Hash value
881 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700882 protected long getHash(long dpid) {
Ray Milkey8e5170e2014-04-02 12:09:55 -0700883 // This code assumes DPID is sequentially assigned.
884 // TODO consider equalization algorithm
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700885 return dpid % numberThread;
Ray Milkey8e5170e2014-04-02 12:09:55 -0700886 }
887
888 /**
889 * Get a Thread object which processes the queue attached to a switch.
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700890 * <p>
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700891 * @param dpid DPID of the switch
Ray Milkey8e5170e2014-04-02 12:09:55 -0700892 * @return Thread object
893 */
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700894 protected FlowPusherThread getProcessingThread(Dpid dpid) {
895 long hash = getHash(dpid.value());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700896
897 return threadMap.get(hash);
898 }
899
900 @Override
901 public String getName() {
902 return "flowpusher";
903 }
904
905 @Override
906 public boolean isCallbackOrderingPrereq(OFType type, String name) {
907 return false;
908 }
909
910 @Override
911 public boolean isCallbackOrderingPostreq(OFType type, String name) {
912 return false;
913 }
914
915 @Override
916 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
917 if (log.isTraceEnabled()) {
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700918 log.trace("Received BARRIER_REPLY from : {}", sw.getStringId());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700919 }
920
Pavlin Radoslavov0b88a262014-04-10 15:43:27 -0700921 if ((msg.getType() != OFType.BARRIER_REPLY) ||
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700922 !(msg instanceof OFBarrierReply)) {
Naoki Shiota72afb332014-07-22 17:39:00 -0700923 log.error("Unexpected reply message: {}", msg.getType());
Ray Milkey8e5170e2014-04-02 12:09:55 -0700924 return Command.CONTINUE;
925 }
926
927 OFBarrierReply reply = (OFBarrierReply) msg;
Jonathan Hart5302b6c2014-08-13 15:57:59 -0700928 BarrierInfo info = BarrierInfo.create(sw.getId(), reply);
Ray Milkey8e5170e2014-04-02 12:09:55 -0700929 // Deliver future if exists
930 OFBarrierReplyFuture future = barrierFutures.get(info);
931 if (future != null) {
932 future.deliverFuture(sw, msg);
933 barrierFutures.remove(info);
934 }
935
936 return Command.CONTINUE;
937 }
Brian O'Connorc67f9fa2014-08-07 18:17:46 -0700938
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700939}