blob: 07cb719a91e1fb1fb93420509bf9325c14fc337f [file] [log] [blame]
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001package net.onrc.onos.ofcontroller.flowmanager;
2
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07003import java.io.IOException;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07004import java.util.HashMap;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07005import java.util.Map;
6import java.util.Queue;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07007
8import org.openflow.protocol.OFMessage;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07009
10import net.floodlightcontroller.core.FloodlightContext;
11import net.floodlightcontroller.core.IOFSwitch;
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070012
13/**
14 * FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
15 * FlowPusher controls the rate of sending flow_mods so that connection doesn't overflow.
16 * @author Naoki Shiota
17 *
18 */
19public class FlowPusher {
20 private FloodlightContext context;
21 private Thread thread;
22
Naoki Shiotacf1acca2013-10-31 11:40:32 -070023 /**
24 * Represents state of queue.
25 * This is used for calculation of rate.
26 * @author Naoki Shiota
27 *
28 */
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070029 private static class QueueState {
30 long last_sent_time = 0;
31 long last_sent_size = 0;
32 long max_rate;
33 }
Naoki Shiotacf1acca2013-10-31 11:40:32 -070034
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070035 private Map<IOFSwitch, Queue<OFMessage> > queues =
36 new HashMap<IOFSwitch, Queue<OFMessage> >();
37 private Map<Queue<OFMessage>, QueueState> queue_states =
38 new HashMap<Queue<OFMessage>, QueueState>();
39
40 private class FlowPusherProcess implements Runnable {
41 @Override
42 public void run() {
43 while (true) {
44 for (Map.Entry<IOFSwitch, Queue<OFMessage> > entry : queues.entrySet()) {
45 // pick one FlowEntry event from a queue
46 IOFSwitch sw = entry.getKey();
47 Queue<OFMessage> queue = entry.getValue();
48 if (entry == null || queue == null) {
49 continue;
50 }
51
52 OFMessage msg = queue.poll();
53 if (msg == null) {
54 continue;
55 }
56
57 QueueState state = queue_states.get(queue);
58 if (state == null) {
59 continue;
60 }
61
62 // check sending rate and determine it to be sent or not
63 long current_time = System.nanoTime();
64 long rate = state.last_sent_size / (current_time - state.last_sent_time);
65
66 // if need to send, call IOFSwitch#write()
67 if (rate < state.max_rate) {
68 try {
69 sw.write(msg, context);
70 state.last_sent_time = current_time;
71 state.last_sent_size = msg.getLengthU();
72 } catch (IOException e) {
73 // TODO Auto-generated catch block
74 e.printStackTrace();
75 }
76 }
77 }
78
79 // sleep while all queues are empty
80 boolean sleep = true;
81 do {
Naoki Shiotacf1acca2013-10-31 11:40:32 -070082 // TODO check if queues are empty
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070083 } while (sleep);
84 }
85 }
86 }
87
88 public FlowPusher(FloodlightContext context) {
89 this.context = context;
90 }
91
Naoki Shiotacf1acca2013-10-31 11:40:32 -070092 public void assignQueue(IOFSwitch sw, Queue<OFMessage> queue, long max_rate) {
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070093 queues.put(sw, queue);
Naoki Shiotacf1acca2013-10-31 11:40:32 -070094 QueueState state = new QueueState();
95 state.max_rate = max_rate;
96 queue_states.put(queue, state);
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -070097 }
98
99 public void startProcess() {
100 thread = new Thread(new FlowPusherProcess());
101 thread.start();
102 }
103
104 public void stopProcess() {
105 // TODO tell thread to halt
106 }
107}