blob: 940620e65ea121243127afcd9af09b3572ec66b5 [file] [log] [blame]
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001package net.onrc.onos.ofcontroller.flowmanager;
2
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07004import java.util.HashMap;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07005import java.util.Map;
6import java.util.Queue;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007
8import org.openflow.protocol.OFMessage;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009
10import net.floodlightcontroller.core.FloodlightContext;
11import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012
13/**
14 * FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
15 * FlowPusher controls the rate of sending flow_mods so that connection doesn't overflow.
16 * @author Naoki Shiota
17 *
18 */
19public class FlowPusher {
20 private FloodlightContext context;
Naoki Shiotac2a699a2013-10-31 15:36:01 -070021 private FlowQueueTable flowQueueTable = null;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070022 private Thread thread;
23
Naoki Shiotacf1acca2013-10-31 11:40:32 -070024 /**
25 * Represents state of queue.
26 * This is used for calculation of rate.
27 * @author Naoki Shiota
28 *
29 */
Naoki Shiotac2a699a2013-10-31 15:36:01 -070030 private static class RateInfo {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070031 long last_sent_time = 0;
32 long last_sent_size = 0;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070033 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -070034
Naoki Shiotac2a699a2013-10-31 15:36:01 -070035 private Map<Long, RateInfo> queue_rateinfos =
36 new HashMap<Long, RateInfo>();
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070037
38 private class FlowPusherProcess implements Runnable {
39 @Override
40 public void run() {
Naoki Shiotac2a699a2013-10-31 15:36:01 -070041 if (flowQueueTable == null) {
42 return;
43 }
44
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070045 while (true) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -070046 for (IOFSwitch sw : flowQueueTable.getSwitches()) {
47 // Skip if queue is suspended
48 if (flowQueueTable.isQueueSusupended(sw)) {
49 continue;
50 }
51
52 // Skip if queue is locked
53 if (! flowQueueTable.lockQueueIfAvailable(sw)) {
54 continue;
55 }
56
57 long dpid = sw.getId();
58 Queue<OFMessage> queue = flowQueueTable.getQueue(sw);
59
60 if (queue == null) {
61 flowQueueTable.unlockQueue(sw);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070062 continue;
63 }
64
65 OFMessage msg = queue.poll();
66 if (msg == null) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -070067 flowQueueTable.unlockQueue(sw);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070068 continue;
69 }
70
Naoki Shiotac2a699a2013-10-31 15:36:01 -070071 RateInfo state = queue_rateinfos.get(dpid);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070072 if (state == null) {
Naoki Shiotac2a699a2013-10-31 15:36:01 -070073 queue_rateinfos.put(dpid, new RateInfo());
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070074 }
75
76 // check sending rate and determine it to be sent or not
77 long current_time = System.nanoTime();
78 long rate = state.last_sent_size / (current_time - state.last_sent_time);
79
80 // if need to send, call IOFSwitch#write()
Naoki Shiotac2a699a2013-10-31 15:36:01 -070081 if (rate < flowQueueTable.getQueueRate(sw)) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070082 try {
83 sw.write(msg, context);
84 state.last_sent_time = current_time;
85 state.last_sent_size = msg.getLengthU();
86 } catch (IOException e) {
87 // TODO Auto-generated catch block
88 e.printStackTrace();
89 }
90 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -070091
92 flowQueueTable.unlockQueue(sw);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070093 }
94
95 // sleep while all queues are empty
96 boolean sleep = true;
97 do {
Naoki Shiotacf1acca2013-10-31 11:40:32 -070098 // TODO check if queues are empty
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070099 } while (sleep);
100 }
101 }
102 }
103
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700104 public FlowPusher(FlowQueueTable table, FloodlightContext context) {
105 flowQueueTable = table;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700106 this.context = context;
107 }
108
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700109 public void startProcess() {
110 thread = new Thread(new FlowPusherProcess());
111 thread.start();
112 }
113
114 public void stopProcess() {
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700115 if (thread != null && thread.isAlive()) {
116 // TODO tell thread to halt
117 }
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700118 }
Naoki Shiotac2a699a2013-10-31 15:36:01 -0700119
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -0700120}