blob: d2af973b3f337655de5f43228a4d5006510fd5c3 [file] [log] [blame]
Naoki Shiotaaea88582013-11-12 17:58:34 -08001package net.onrc.onos.ofcontroller.flowprogrammer;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07002
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08004import java.util.ArrayDeque;
5import java.util.ArrayList;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -08006import java.util.Collection;
Naoki Shiota7d0cf272013-11-05 10:18:12 -08007import java.util.EnumSet;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07008import java.util.HashMap;
Naoki Shiotaf03592e2013-11-27 11:20:39 -08009import java.util.HashSet;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080010import java.util.LinkedList;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080011import java.util.List;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012import java.util.Map;
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080013import java.util.Set;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070014import java.util.concurrent.ConcurrentHashMap;
Naoki Shiotac1601d32013-11-20 10:47:34 -080015import java.util.concurrent.ExecutionException;
Naoki Shiota81dbe302013-11-21 15:35:38 -080016import java.util.concurrent.Semaphore;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070017
Naoki Shiota7d0cf272013-11-05 10:18:12 -080018import org.openflow.protocol.*;
19import org.openflow.protocol.action.*;
20import org.openflow.protocol.factory.BasicFactory;
21import org.slf4j.Logger;
22import org.slf4j.LoggerFactory;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070023
24import net.floodlightcontroller.core.FloodlightContext;
Naoki Shiotac1601d32013-11-20 10:47:34 -080025import net.floodlightcontroller.core.IFloodlightProviderService;
26import net.floodlightcontroller.core.IOFMessageListener;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070027import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotac1601d32013-11-20 10:47:34 -080028import net.floodlightcontroller.core.internal.OFMessageFuture;
29import net.floodlightcontroller.core.module.FloodlightModuleContext;
30import net.floodlightcontroller.threadpool.IThreadPoolService;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080031import net.floodlightcontroller.util.MACAddress;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080032import net.floodlightcontroller.util.OFMessageDamper;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080033import net.onrc.onos.ofcontroller.util.FlowEntryAction;
34import net.onrc.onos.ofcontroller.util.FlowEntryAction.*;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080035import net.onrc.onos.ofcontroller.util.FlowEntry;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080036import net.onrc.onos.ofcontroller.util.FlowEntryActions;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080037import net.onrc.onos.ofcontroller.util.FlowEntryMatch;
38import net.onrc.onos.ofcontroller.util.FlowEntryUserState;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080039import net.onrc.onos.ofcontroller.util.IPv4Net;
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -080040import net.onrc.onos.ofcontroller.util.Pair;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -080041import net.onrc.onos.ofcontroller.util.Port;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070042
43/**
Naoki Shiotab485d412013-11-26 12:04:19 -080044 * FlowPusher is a implementation of FlowPusherService.
45 * FlowPusher assigns one message queue instance for each one switch.
46 * Number of message processing threads is configurable by constructor, and
47 * one thread can handle multiple message queues. Each queue will be assigned to
48 * a thread according to hash function defined by getHash().
49 * Each processing thread reads messages from queues and sends it to switches
50 * in round-robin. Processing thread also calculates rate of sending to suppress
51 * excessive message sending.
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070052 * @author Naoki Shiota
53 *
54 */
Naoki Shiotac1601d32013-11-20 10:47:34 -080055public class FlowPusher implements IFlowPusherService, IOFMessageListener {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080056 private final static Logger log = LoggerFactory.getLogger(FlowPusher.class);
57
58 // NOTE: Below are moved from FlowManager.
59 // TODO: Values copied from elsewhere (class LearningSwitch).
60 // The local copy should go away!
61 //
62 protected static final int OFMESSAGE_DAMPER_CAPACITY = 50000; // TODO: find sweet spot
63 protected static final int OFMESSAGE_DAMPER_TIMEOUT = 250; // ms
Naoki Shiota5c8d19f2013-11-05 15:52:38 -080064
Naoki Shiota7d0bcfa2013-11-13 10:43:33 -080065 // Number of messages sent to switch at once
66 protected static final int MAX_MESSAGE_SEND = 100;
Naoki Shiota7d0cf272013-11-05 10:18:12 -080067
Pavlin Radoslavovafbf1032014-02-04 10:37:52 -080068 public enum QueueState {
Naoki Shiota7d0cf272013-11-05 10:18:12 -080069 READY,
70 SUSPENDED,
71 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070072
73 private static class SwitchQueueEntry {
74 OFMessage msg;
75
76 public SwitchQueueEntry(OFMessage msg) {
77 this.msg = msg;
78 }
79
80 public OFMessage getOFMessage() {
81 return msg;
82 }
83 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070084
Naoki Shiotac1601d32013-11-20 10:47:34 -080085 /**
Naoki Shiotab485d412013-11-26 12:04:19 -080086 * SwitchQueue represents message queue attached to a switch.
Naoki Shiotac1601d32013-11-20 10:47:34 -080087 * This consists of queue itself and variables used for limiting sending rate.
88 * @author Naoki Shiota
89 *
90 */
Naoki Shiotad6ef3b32014-03-13 18:42:23 -070091 private class SwitchQueue extends ArrayDeque<SwitchQueueEntry> {
Naoki Shiota991093a2013-12-10 14:47:18 -080092 private static final long serialVersionUID = 1L;
93
Naoki Shiota7d0cf272013-11-05 10:18:12 -080094 QueueState state;
95
Naoki Shiotae3199732013-11-25 16:14:43 -080096 // Max rate of sending message (bytes/ms). 0 implies no limitation.
Naoki Shiota8ee48d52013-11-11 15:51:17 -080097 long max_rate = 0; // 0 indicates no limitation
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070098 long last_sent_time = 0;
99 long last_sent_size = 0;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800100
Naoki Shiotae3199732013-11-25 16:14:43 -0800101 // "To be deleted" flag
102 boolean toBeDeleted = false;
103
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800104 /**
105 * Check if sending rate is within the rate
106 * @param current Current time
107 * @return true if within the rate
108 */
109 boolean isSendable(long current) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800110 if (max_rate == 0) {
111 // no limitation
112 return true;
113 }
114
Naoki Shiota81dbe302013-11-21 15:35:38 -0800115 if (current == last_sent_time) {
116 return false;
117 }
118
Naoki Shiotac1601d32013-11-20 10:47:34 -0800119 // Check if sufficient time (from aspect of rate) elapsed or not.
Naoki Shiotab485d412013-11-26 12:04:19 -0800120 long rate = last_sent_size / (current - last_sent_time);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800121 return (rate < max_rate);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800122 }
123
Naoki Shiota81dbe302013-11-21 15:35:38 -0800124 /**
125 * Log time and size of last sent data.
126 * @param current Time to be sent.
127 * @param size Size of sent data (in bytes).
128 */
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800129 void logSentData(long current, long size) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800130 last_sent_time = current;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800131 last_sent_size = size;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800132 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700133 }
134
135 /**
136 * BarrierInfo holds information to specify barrier message sent to switch.
137 * @author Naoki
138 */
139 private static class BarrierInfo {
140 final long dpid;
141 final int xid;
142
143 static BarrierInfo create(IOFSwitch sw, OFBarrierRequest req) {
144 return new BarrierInfo(sw.getId(), req.getXid());
145 }
146
147 static BarrierInfo create(IOFSwitch sw, OFBarrierReply rpy) {
148 return new BarrierInfo(sw.getId(), rpy.getXid());
149 }
150
151 private BarrierInfo(long dpid, int xid) {
152 this.dpid = dpid;
153 this.xid = xid;
154 }
155
156 // Auto generated code by Eclipse
157 @Override
158 public int hashCode() {
159 final int prime = 31;
160 int result = 1;
161 result = prime * result + (int) (dpid ^ (dpid >>> 32));
162 result = prime * result + xid;
163 return result;
164 }
165
166 @Override
167 public boolean equals(Object obj) {
168 if (this == obj)
169 return true;
170 if (obj == null)
171 return false;
172 if (getClass() != obj.getClass())
173 return false;
174
175 BarrierInfo other = (BarrierInfo) obj;
176 return (this.dpid == other.dpid) && (this.xid == other.xid);
177 }
178
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800179
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700180 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800181
Naoki Shiotac1601d32013-11-20 10:47:34 -0800182 private OFMessageDamper messageDamper = null;
183 private IThreadPoolService threadPool = null;
Naoki Shiotacf1acca2013-10-31 11:40:32 -0700184
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800185 private FloodlightContext context = null;
186 private BasicFactory factory = null;
Naoki Shiotab485d412013-11-26 12:04:19 -0800187
188 // Map of threads versus dpid
Naoki Shiota81dbe302013-11-21 15:35:38 -0800189 private Map<Long, FlowPusherThread> threadMap = null;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700190 // Map from (DPID and transaction ID) to Future objects.
191 private Map<BarrierInfo, OFBarrierReplyFuture> barrierFutures
192 = new ConcurrentHashMap<BarrierInfo,OFBarrierReplyFuture>();
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800193
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800194 private int number_thread = 1;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800195
Naoki Shiota8739faa2013-11-18 17:00:25 -0800196 /**
197 * Main thread that reads messages from queues and sends them to switches.
198 * @author Naoki Shiota
199 *
200 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800201 private class FlowPusherThread extends Thread {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800202 private Map<IOFSwitch,SwitchQueue> queues
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700203 = new ConcurrentHashMap<IOFSwitch,SwitchQueue>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800204
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800205 // reusable latch used for waiting for arrival of message
Naoki Shiota81dbe302013-11-21 15:35:38 -0800206 private Semaphore mutex = new Semaphore(0);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800207
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700208 @Override
209 public void run() {
Yuta HIGUCHI61509a42013-12-17 10:41:04 -0800210 this.setName("FlowPusherThread " + this.getId() );
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700211 while (true) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800212 try {
213 // wait for message pushed to queue
214 mutex.acquire();
215 } catch (InterruptedException e) {
Naoki Shiota75b7dd62013-12-03 18:09:21 -0800216 // not an error
Naoki Shiota81dbe302013-11-21 15:35:38 -0800217 log.debug("FlowPusherThread is interrupted");
218 return;
219 }
220
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800221 // for safety of concurrent access, copy all key objects
222 Set<IOFSwitch> keys = new HashSet<IOFSwitch>(queues.size());
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700223 for (IOFSwitch sw : queues.keySet()) {
224 keys.add(sw);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800225 }
226
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800227 for (IOFSwitch sw : keys) {
228 SwitchQueue queue = queues.get(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800229
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700230 // Skip if queue is suspended
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800231 if (sw == null || queue == null ||
232 queue.state != QueueState.READY) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700233 continue;
234 }
235
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800236 synchronized (queue) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800237 processQueue(sw, queue, MAX_MESSAGE_SEND);
238 if (queue.isEmpty()) {
239 // remove queue if flagged to be.
240 if (queue.toBeDeleted) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700241 queues.remove(sw);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800242 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800243 } else {
244 // if some messages remains in queue, latch down
245 if (mutex.availablePermits() == 0) {
246 mutex.release();
Naoki Shiota81dbe302013-11-21 15:35:38 -0800247 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700248 }
249 }
250 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700251 }
252 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800253
254 /**
255 * Read messages from queue and send them to the switch.
256 * If number of messages excess the limit, stop sending messages.
257 * @param sw Switch to which messages will be sent.
258 * @param queue Queue of messages.
259 * @param max_msg Limitation of number of messages to be sent. If set to 0,
260 * all messages in queue will be sent.
261 */
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700262 private void processQueue(IOFSwitch sw, SwitchQueue queue, int max_msg) {
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800263 // check sending rate and determine it to be sent or not
264 long current_time = System.currentTimeMillis();
265 long size = 0;
266
267 if (queue.isSendable(current_time)) {
268 int i = 0;
269 while (! queue.isEmpty()) {
270 // Number of messages excess the limit
271 if (0 < max_msg && max_msg <= i) {
272 break;
273 }
274 ++i;
275
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700276 SwitchQueueEntry queueEntry;
277 synchronized (queue) {
278 queueEntry = queue.poll();
279 }
280
281 OFMessage msg = queueEntry.getOFMessage();
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800282 try {
283 messageDamper.write(sw, msg, context);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700284 if (log.isTraceEnabled()) {
285 log.trace("Pusher sends message : {}", msg);
286 }
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800287 size += msg.getLength();
288 } catch (IOException e) {
289 e.printStackTrace();
290 log.error("Exception in sending message ({}) : {}", msg, e);
291 }
292 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700293
Naoki Shiotaf03592e2013-11-27 11:20:39 -0800294 sw.flush();
295 queue.logSentData(current_time, size);
296 }
297 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700298 }
299
Naoki Shiotac1601d32013-11-20 10:47:34 -0800300 /**
301 * Initialize object with one thread.
302 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800303 public FlowPusher() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800304 }
305
Naoki Shiotac1601d32013-11-20 10:47:34 -0800306 /**
307 * Initialize object with threads of given number.
308 * @param number_thread Number of threads to handle messages.
309 */
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800310 public FlowPusher(int number_thread) {
311 this.number_thread = number_thread;
312 }
313
Naoki Shiotac1601d32013-11-20 10:47:34 -0800314 /**
315 * Set parameters needed for sending messages.
316 * @param context FloodlightContext used for sending messages.
317 * If null, FlowPusher uses default context.
318 * @param modContext FloodlightModuleContext used for acquiring
319 * ThreadPoolService and registering MessageListener.
320 * @param factory Factory object to create OFMessage objects.
321 * @param damper Message damper used for sending messages.
322 * If null, FlowPusher creates its own damper object.
323 */
324 public void init(FloodlightContext context,
325 FloodlightModuleContext modContext,
326 BasicFactory factory,
327 OFMessageDamper damper) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700328 this.context = context;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800329 this.factory = factory;
Naoki Shiotac1601d32013-11-20 10:47:34 -0800330 this.threadPool = modContext.getServiceImpl(IThreadPoolService.class);
331 IFloodlightProviderService flservice = modContext.getServiceImpl(IFloodlightProviderService.class);
332 flservice.addOFMessageListener(OFType.BARRIER_REPLY, this);
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800333
334 if (damper != null) {
335 messageDamper = damper;
336 } else {
Naoki Shiotab485d412013-11-26 12:04:19 -0800337 // use default values
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800338 messageDamper = new OFMessageDamper(OFMESSAGE_DAMPER_CAPACITY,
339 EnumSet.of(OFType.FLOW_MOD),
340 OFMESSAGE_DAMPER_TIMEOUT);
341 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700342 }
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800343
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800344 /**
345 * Begin processing queue.
346 */
347 public void start() {
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800348 if (factory == null) {
349 log.error("FlowPusher not yet initialized.");
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800350 return;
351 }
352
Naoki Shiota81dbe302013-11-21 15:35:38 -0800353 threadMap = new HashMap<Long,FlowPusherThread>();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800354 for (long i = 0; i < number_thread; ++i) {
Naoki Shiota81dbe302013-11-21 15:35:38 -0800355 FlowPusherThread thread = new FlowPusherThread();
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800356
Naoki Shiota81dbe302013-11-21 15:35:38 -0800357 threadMap.put(i, thread);
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800358 thread.start();
359 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700360 }
361
Brian O'Connor8c166a72013-11-14 18:41:48 -0800362 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800363 public boolean suspend(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800364 SwitchQueue queue = getQueue(sw);
365
366 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800367 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800368 }
369
370 synchronized (queue) {
371 if (queue.state == QueueState.READY) {
372 queue.state = QueueState.SUSPENDED;
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800373 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800374 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800375 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800376 }
377 }
378
Brian O'Connor8c166a72013-11-14 18:41:48 -0800379 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800380 public boolean resume(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800381 SwitchQueue queue = getQueue(sw);
382
383 if (queue == null) {
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800384 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800385 }
386
387 synchronized (queue) {
388 if (queue.state == QueueState.SUSPENDED) {
389 queue.state = QueueState.READY;
Naoki Shiota0aa947e2013-11-27 14:47:35 -0800390
391 // Latch down if queue is not empty
392 FlowPusherThread thread = getProcess(sw);
393 if (! queue.isEmpty() &&
394 thread.mutex.availablePermits() == 0) {
395 thread.mutex.release();
396 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800397 return true;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800398 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800399 return false;
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800400 }
401 }
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800402
Brian O'Connor8c166a72013-11-14 18:41:48 -0800403 @Override
Naoki Shiota8ee48d52013-11-11 15:51:17 -0800404 public boolean isSuspended(IOFSwitch sw) {
405 SwitchQueue queue = getQueue(sw);
406
407 if (queue == null) {
408 // TODO Is true suitable for this case?
409 return true;
410 }
411
412 return (queue.state == QueueState.SUSPENDED);
413 }
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800414
415 /**
Naoki Shiota8739faa2013-11-18 17:00:25 -0800416 * Stop processing queue and exit thread.
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800417 */
418 public void stop() {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800419 if (threadMap == null) {
420 return;
421 }
422
Naoki Shiota81dbe302013-11-21 15:35:38 -0800423 for (FlowPusherThread t : threadMap.values()) {
424 t.interrupt();
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800425 }
426 }
427
Naoki Shiotae3199732013-11-25 16:14:43 -0800428 @Override
Naoki Shiota5c8d19f2013-11-05 15:52:38 -0800429 public void setRate(IOFSwitch sw, long rate) {
430 SwitchQueue queue = getQueue(sw);
431 if (queue == null) {
432 return;
433 }
434
435 if (rate > 0) {
Naoki Shiota2a35b442013-11-26 19:17:38 -0800436 log.debug("rate for {} is set to {}", sw.getId(), rate);
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700437 synchronized (queue) {
438 queue.max_rate = rate;
439 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700440 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700441 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800442
443 @Override
444 public boolean createQueue(IOFSwitch sw) {
445 SwitchQueue queue = getQueue(sw);
446 if (queue != null) {
447 return false;
448 }
449
450 FlowPusherThread proc = getProcess(sw);
451 queue = new SwitchQueue();
452 queue.state = QueueState.READY;
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700453 proc.queues.put(sw, queue);
Naoki Shiotae3199732013-11-25 16:14:43 -0800454
455 return true;
456 }
457
458 @Override
459 public boolean deleteQueue(IOFSwitch sw) {
460 return deleteQueue(sw, false);
461 }
462
463 @Override
Naoki Shiotab485d412013-11-26 12:04:19 -0800464 public boolean deleteQueue(IOFSwitch sw, boolean forceStop) {
Naoki Shiotae3199732013-11-25 16:14:43 -0800465 FlowPusherThread proc = getProcess(sw);
466
Naoki Shiotab485d412013-11-26 12:04:19 -0800467 if (forceStop) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700468 SwitchQueue queue = proc.queues.remove(sw);
469 if (queue == null) {
470 return false;
Naoki Shiotae3199732013-11-25 16:14:43 -0800471 }
472 return true;
473 } else {
474 SwitchQueue queue = getQueue(sw);
475 if (queue == null) {
476 return false;
477 }
478 synchronized (queue) {
479 queue.toBeDeleted = true;
480 }
481 return true;
482 }
483 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700484
Brian O'Connor8c166a72013-11-14 18:41:48 -0800485 @Override
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800486 public boolean add(IOFSwitch sw, OFMessage msg) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700487 return addMessageImpl(sw, msg);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800488 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700489
Brian O'Connor8c166a72013-11-14 18:41:48 -0800490 @Override
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800491 public void pushFlowEntries(
492 Collection<Pair<IOFSwitch, FlowEntry>> entries) {
493
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800494 for (Pair<IOFSwitch, FlowEntry> entry : entries) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700495 add(entry.first, entry.second);
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800496 }
Pavlin Radoslavovab3f8862013-12-04 18:35:53 -0800497 }
498
499 @Override
500 public void pushFlowEntry(IOFSwitch sw, FlowEntry flowEntry) {
501 Collection<Pair<IOFSwitch, FlowEntry>> entries =
502 new LinkedList<Pair<IOFSwitch, FlowEntry>>();
503
504 entries.add(new Pair<IOFSwitch, FlowEntry>(sw, flowEntry));
505 pushFlowEntries(entries);
506 }
507
508 /**
509 * Create a message from FlowEntry and add it to the queue of the switch.
510 * @param sw Switch to which message is pushed.
511 * @param flowEntry FlowEntry object used for creating message.
512 * @return true if message is successfully added to a queue.
513 */
514 private boolean add(IOFSwitch sw, FlowEntry flowEntry) {
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800515 //
516 // Create the OpenFlow Flow Modification Entry to push
517 //
518 OFFlowMod fm = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
519 long cookie = flowEntry.flowEntryId().value();
520
521 short flowModCommand = OFFlowMod.OFPFC_ADD;
522 if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_ADD) {
523 flowModCommand = OFFlowMod.OFPFC_ADD;
524 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_MODIFY) {
525 flowModCommand = OFFlowMod.OFPFC_MODIFY_STRICT;
526 } else if (flowEntry.flowEntryUserState() == FlowEntryUserState.FE_USER_DELETE) {
527 flowModCommand = OFFlowMod.OFPFC_DELETE_STRICT;
528 } else {
529 // Unknown user state. Ignore the entry
530 log.debug(
531 "Flow Entry ignored (FlowEntryId = {}): unknown user state {}",
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800532 flowEntry.flowEntryId(),
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800533 flowEntry.flowEntryUserState());
534 return false;
535 }
536
537 //
538 // Fetch the match conditions.
539 //
540 // NOTE: The Flow matching conditions common for all Flow Entries are
541 // used ONLY if a Flow Entry does NOT have the corresponding matching
542 // condition set.
543 //
544 OFMatch match = new OFMatch();
545 match.setWildcards(OFMatch.OFPFW_ALL);
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800546 FlowEntryMatch flowEntryMatch = flowEntry.flowEntryMatch();
547
548 // Match the Incoming Port
549 Port matchInPort = flowEntryMatch.inPort();
550 if (matchInPort != null) {
551 match.setInputPort(matchInPort.value());
552 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_IN_PORT);
553 }
554
555 // Match the Source MAC address
556 MACAddress matchSrcMac = flowEntryMatch.srcMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800557 if (matchSrcMac != null) {
558 match.setDataLayerSource(matchSrcMac.toString());
559 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_SRC);
560 }
561
562 // Match the Destination MAC address
563 MACAddress matchDstMac = flowEntryMatch.dstMac();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800564 if (matchDstMac != null) {
565 match.setDataLayerDestination(matchDstMac.toString());
566 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_DST);
567 }
568
569 // Match the Ethernet Frame Type
570 Short matchEthernetFrameType = flowEntryMatch.ethernetFrameType();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800571 if (matchEthernetFrameType != null) {
572 match.setDataLayerType(matchEthernetFrameType);
573 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_TYPE);
574 }
575
576 // Match the VLAN ID
577 Short matchVlanId = flowEntryMatch.vlanId();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800578 if (matchVlanId != null) {
579 match.setDataLayerVirtualLan(matchVlanId);
580 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_DL_VLAN);
581 }
582
583 // Match the VLAN priority
584 Byte matchVlanPriority = flowEntryMatch.vlanPriority();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800585 if (matchVlanPriority != null) {
586 match.setDataLayerVirtualLanPriorityCodePoint(matchVlanPriority);
587 match.setWildcards(match.getWildcards()
588 & ~OFMatch.OFPFW_DL_VLAN_PCP);
589 }
590
591 // Match the Source IPv4 Network prefix
592 IPv4Net matchSrcIPv4Net = flowEntryMatch.srcIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800593 if (matchSrcIPv4Net != null) {
594 match.setFromCIDR(matchSrcIPv4Net.toString(), OFMatch.STR_NW_SRC);
595 }
596
597 // Natch the Destination IPv4 Network prefix
598 IPv4Net matchDstIPv4Net = flowEntryMatch.dstIPv4Net();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800599 if (matchDstIPv4Net != null) {
600 match.setFromCIDR(matchDstIPv4Net.toString(), OFMatch.STR_NW_DST);
601 }
602
603 // Match the IP protocol
604 Byte matchIpProto = flowEntryMatch.ipProto();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800605 if (matchIpProto != null) {
606 match.setNetworkProtocol(matchIpProto);
607 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_PROTO);
608 }
609
610 // Match the IP ToS (DSCP field, 6 bits)
611 Byte matchIpToS = flowEntryMatch.ipToS();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800612 if (matchIpToS != null) {
613 match.setNetworkTypeOfService(matchIpToS);
614 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_NW_TOS);
615 }
616
617 // Match the Source TCP/UDP port
618 Short matchSrcTcpUdpPort = flowEntryMatch.srcTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800619 if (matchSrcTcpUdpPort != null) {
620 match.setTransportSource(matchSrcTcpUdpPort);
621 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_SRC);
622 }
623
624 // Match the Destination TCP/UDP port
625 Short matchDstTcpUdpPort = flowEntryMatch.dstTcpUdpPort();
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800626 if (matchDstTcpUdpPort != null) {
627 match.setTransportDestination(matchDstTcpUdpPort);
628 match.setWildcards(match.getWildcards() & ~OFMatch.OFPFW_TP_DST);
629 }
630
631 //
632 // Fetch the actions
633 //
634 Short actionOutputPort = null;
635 List<OFAction> openFlowActions = new ArrayList<OFAction>();
636 int actionsLen = 0;
637 FlowEntryActions flowEntryActions = flowEntry.flowEntryActions();
638 //
639 for (FlowEntryAction action : flowEntryActions.actions()) {
640 ActionOutput actionOutput = action.actionOutput();
641 ActionSetVlanId actionSetVlanId = action.actionSetVlanId();
642 ActionSetVlanPriority actionSetVlanPriority = action
643 .actionSetVlanPriority();
644 ActionStripVlan actionStripVlan = action.actionStripVlan();
645 ActionSetEthernetAddr actionSetEthernetSrcAddr = action
646 .actionSetEthernetSrcAddr();
647 ActionSetEthernetAddr actionSetEthernetDstAddr = action
648 .actionSetEthernetDstAddr();
649 ActionSetIPv4Addr actionSetIPv4SrcAddr = action
650 .actionSetIPv4SrcAddr();
651 ActionSetIPv4Addr actionSetIPv4DstAddr = action
652 .actionSetIPv4DstAddr();
653 ActionSetIpToS actionSetIpToS = action.actionSetIpToS();
654 ActionSetTcpUdpPort actionSetTcpUdpSrcPort = action
655 .actionSetTcpUdpSrcPort();
656 ActionSetTcpUdpPort actionSetTcpUdpDstPort = action
657 .actionSetTcpUdpDstPort();
658 ActionEnqueue actionEnqueue = action.actionEnqueue();
659
660 if (actionOutput != null) {
661 actionOutputPort = actionOutput.port().value();
662 // XXX: The max length is hard-coded for now
663 OFActionOutput ofa = new OFActionOutput(actionOutput.port()
664 .value(), (short) 0xffff);
665 openFlowActions.add(ofa);
666 actionsLen += ofa.getLength();
667 }
668
669 if (actionSetVlanId != null) {
670 OFActionVirtualLanIdentifier ofa = new OFActionVirtualLanIdentifier(
671 actionSetVlanId.vlanId());
672 openFlowActions.add(ofa);
673 actionsLen += ofa.getLength();
674 }
675
676 if (actionSetVlanPriority != null) {
677 OFActionVirtualLanPriorityCodePoint ofa = new OFActionVirtualLanPriorityCodePoint(
678 actionSetVlanPriority.vlanPriority());
679 openFlowActions.add(ofa);
680 actionsLen += ofa.getLength();
681 }
682
683 if (actionStripVlan != null) {
684 if (actionStripVlan.stripVlan() == true) {
685 OFActionStripVirtualLan ofa = new OFActionStripVirtualLan();
686 openFlowActions.add(ofa);
687 actionsLen += ofa.getLength();
688 }
689 }
690
691 if (actionSetEthernetSrcAddr != null) {
692 OFActionDataLayerSource ofa = new OFActionDataLayerSource(
693 actionSetEthernetSrcAddr.addr().toBytes());
694 openFlowActions.add(ofa);
695 actionsLen += ofa.getLength();
696 }
697
698 if (actionSetEthernetDstAddr != null) {
699 OFActionDataLayerDestination ofa = new OFActionDataLayerDestination(
700 actionSetEthernetDstAddr.addr().toBytes());
701 openFlowActions.add(ofa);
702 actionsLen += ofa.getLength();
703 }
704
705 if (actionSetIPv4SrcAddr != null) {
706 OFActionNetworkLayerSource ofa = new OFActionNetworkLayerSource(
707 actionSetIPv4SrcAddr.addr().value());
708 openFlowActions.add(ofa);
709 actionsLen += ofa.getLength();
710 }
711
712 if (actionSetIPv4DstAddr != null) {
713 OFActionNetworkLayerDestination ofa = new OFActionNetworkLayerDestination(
714 actionSetIPv4DstAddr.addr().value());
715 openFlowActions.add(ofa);
716 actionsLen += ofa.getLength();
717 }
718
719 if (actionSetIpToS != null) {
720 OFActionNetworkTypeOfService ofa = new OFActionNetworkTypeOfService(
721 actionSetIpToS.ipToS());
722 openFlowActions.add(ofa);
723 actionsLen += ofa.getLength();
724 }
725
726 if (actionSetTcpUdpSrcPort != null) {
727 OFActionTransportLayerSource ofa = new OFActionTransportLayerSource(
728 actionSetTcpUdpSrcPort.port());
729 openFlowActions.add(ofa);
730 actionsLen += ofa.getLength();
731 }
732
733 if (actionSetTcpUdpDstPort != null) {
734 OFActionTransportLayerDestination ofa = new OFActionTransportLayerDestination(
735 actionSetTcpUdpDstPort.port());
736 openFlowActions.add(ofa);
737 actionsLen += ofa.getLength();
738 }
739
740 if (actionEnqueue != null) {
741 OFActionEnqueue ofa = new OFActionEnqueue(actionEnqueue.port()
742 .value(), actionEnqueue.queueId());
743 openFlowActions.add(ofa);
744 actionsLen += ofa.getLength();
745 }
746 }
747
Pavlin Radoslavov1fe06a22013-12-10 14:12:23 -0800748 fm.setIdleTimeout((short)flowEntry.idleTimeout())
749 .setHardTimeout((short)flowEntry.hardTimeout())
Pavlin Radoslavovafbf1032014-02-04 10:37:52 -0800750 .setPriority((short)flowEntry.priority())
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800751 .setBufferId(OFPacketOut.BUFFER_ID_NONE).setCookie(cookie)
752 .setCommand(flowModCommand).setMatch(match)
753 .setActions(openFlowActions)
754 .setLengthU(OFFlowMod.MINIMUM_LENGTH + actionsLen);
755 fm.setOutPort(OFPort.OFPP_NONE.getValue());
756 if ((flowModCommand == OFFlowMod.OFPFC_DELETE)
757 || (flowModCommand == OFFlowMod.OFPFC_DELETE_STRICT)) {
758 if (actionOutputPort != null)
759 fm.setOutPort(actionOutputPort);
760 }
761
762 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800763 // Set the OFPFF_SEND_FLOW_REM flag if the Flow Entry is not
764 // permanent.
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800765 //
Pavlin Radoslavovb95379c2013-12-13 11:09:20 -0800766 if ((flowEntry.idleTimeout() != 0) ||
767 (flowEntry.hardTimeout() != 0)) {
768 fm.setFlags(OFFlowMod.OFPFF_SEND_FLOW_REM);
769 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800770
771 //
772 // Write the message to the switch
773 //
Yuta HIGUCHI5302ddf2014-01-06 12:53:35 -0800774 log.debug("Installing flow entry {} into switch DPID: {} flowEntryId: {} srcMac: {} dstMac: {} inPort: {} outPort: {}"
775 , flowEntry.flowEntryUserState()
776 , sw.getStringId()
777 , flowEntry.flowEntryId()
778 , matchSrcMac
779 , matchDstMac
780 , matchInPort
781 , actionOutputPort
782 );
783
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700784 return addMessageImpl(sw, fm);
785 }
786
787 /**
788 * Add message to queue
789 * @param sw
790 * @param msg
791 * @param flowEntryId
792 * @return
793 */
794 protected boolean addMessageImpl(IOFSwitch sw, OFMessage msg) {
795 FlowPusherThread proc = getProcess(sw);
796 SwitchQueue queue = proc.queues.get(sw);
797
798 // create queue at first addition of message
799 if (queue == null) {
800 createQueue(sw);
801 queue = getQueue(sw);
802 }
803
804 SwitchQueueEntry entry = new SwitchQueueEntry(msg);
805 synchronized (queue) {
806 queue.add(entry);
807 if (log.isTraceEnabled()) {
808 log.trace("Message is pushed : {}", entry.getOFMessage());
809 }
810 }
811
812 if (proc.mutex.availablePermits() == 0) {
813 proc.mutex.release();
814 }
815
816 return true;
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800817 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800818
819 @Override
820 public OFBarrierReply barrier(IOFSwitch sw) {
821 OFMessageFuture<OFBarrierReply> future = barrierAsync(sw);
822 if (future == null) {
823 return null;
824 }
825
826 try {
827 return future.get();
828 } catch (InterruptedException e) {
829 e.printStackTrace();
830 log.error("InterruptedException: {}", e);
831 return null;
832 } catch (ExecutionException e) {
833 e.printStackTrace();
834 log.error("ExecutionException: {}", e);
835 return null;
836 }
837 }
Naoki Shiota2e2fc2b2013-11-12 11:21:36 -0800838
Naoki Shiotac1601d32013-11-20 10:47:34 -0800839 @Override
840 public OFBarrierReplyFuture barrierAsync(IOFSwitch sw) {
841 // TODO creation of message and future should be moved to OFSwitchImpl
Naoki Shiota81dbe302013-11-21 15:35:38 -0800842
843 if (sw == null) {
844 return null;
845 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800846
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700847 OFBarrierRequest msg = createBarrierRequest(sw);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800848
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700849 OFBarrierReplyFuture future = new OFBarrierReplyFuture(threadPool, sw, msg.getXid());
850 barrierFutures.put(BarrierInfo.create(sw,msg), future);
851
852 addMessageImpl(sw, msg);
Naoki Shiotac2ec5592013-11-27 12:10:33 -0800853
Naoki Shiotac1601d32013-11-20 10:47:34 -0800854 return future;
855 }
856
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700857 protected OFBarrierRequest createBarrierRequest(IOFSwitch sw) {
858 OFBarrierRequest msg = (OFBarrierRequest) factory.getMessage(OFType.BARRIER_REQUEST);
859 msg.setXid(sw.getNextTransactionId());
860
861 return msg;
862 }
863
Naoki Shiotae3199732013-11-25 16:14:43 -0800864 /**
865 * Get a queue attached to a switch.
866 * @param sw Switch object
867 * @return Queue object
868 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800869 protected SwitchQueue getQueue(IOFSwitch sw) {
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800870 if (sw == null) {
871 return null;
872 }
873
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800874 return getProcess(sw).queues.get(sw);
875 }
876
Naoki Shiotae3199732013-11-25 16:14:43 -0800877 /**
878 * Get a hash value correspondent to a switch.
879 * @param sw Switch object
880 * @return Hash value
881 */
Naoki Shiotac1601d32013-11-20 10:47:34 -0800882 protected long getHash(IOFSwitch sw) {
883 // This code assumes DPID is sequentially assigned.
884 // TODO consider equalization algorithm
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800885 return sw.getId() % number_thread;
886 }
Naoki Shiotae3199732013-11-25 16:14:43 -0800887
888 /**
889 * Get a Thread object which processes the queue attached to a switch.
890 * @param sw Switch object
891 * @return Thread object
892 */
Naoki Shiota81dbe302013-11-21 15:35:38 -0800893 protected FlowPusherThread getProcess(IOFSwitch sw) {
Naoki Shiotaf0cddbf2013-11-12 14:59:43 -0800894 long hash = getHash(sw);
895
896 return threadMap.get(hash);
Naoki Shiota7d0cf272013-11-05 10:18:12 -0800897 }
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700898
Naoki Shiotac1601d32013-11-20 10:47:34 -0800899 @Override
900 public String getName() {
901 return "flowpusher";
902 }
903
904 @Override
905 public boolean isCallbackOrderingPrereq(OFType type, String name) {
906 return false;
907 }
908
909 @Override
910 public boolean isCallbackOrderingPostreq(OFType type, String name) {
911 return false;
912 }
913
914 @Override
915 public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700916 if (log.isTraceEnabled()) {
917 log.trace("Received BARRIER_REPLY from : {}", sw.getId());
918 }
919
920 if (msg.getType() != OFType.BARRIER_REPLY) {
921 log.error("Unexpected reply message : {}", msg.getType());
Naoki Shiotac1601d32013-11-20 10:47:34 -0800922 return Command.CONTINUE;
923 }
924
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700925 OFBarrierReply reply = (OFBarrierReply) msg;
926 BarrierInfo info = BarrierInfo.create(sw,reply);
Naoki Shiotac1601d32013-11-20 10:47:34 -0800927
Naoki Shiotad6ef3b32014-03-13 18:42:23 -0700928 // Deliver future if exists
929 OFBarrierReplyFuture future = barrierFutures.get(info);
930 if (future != null) {
931 future.deliverFuture(sw, msg);
932 barrierFutures.remove(info);
933 }
Naoki Shiotac1601d32013-11-20 10:47:34 -0800934
935 return Command.CONTINUE;
936 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700937}