blob: 54da3fdb7d8d50557808ba20a30edd0dbe07611b [file] [log] [blame]
Naoki Shiotaed4eb5e2013-10-31 10:55:32 -07001package net.onrc.onos.ofcontroller.flowmanager;
2
3import groovy.util.MapEntry;
4
5import java.io.IOException;
6import java.util.ArrayList;
7import java.util.HashMap;
8import java.util.List;
9import java.util.Map;
10import java.util.Queue;
11import java.util.Set;
12
13import org.openflow.protocol.OFMessage;
14import org.openflow.protocol.OFType;
15
16import net.floodlightcontroller.core.FloodlightContext;
17import net.floodlightcontroller.core.IOFSwitch;
18import net.floodlightcontroller.util.OFMessageDamper;
19
20/**
21 * FlowPusher intermediates flow_mod sent from FlowManager/FlowSync to switches.
22 * FlowPusher controls the rate of sending flow_mods so that connection doesn't overflow.
23 * @author Naoki Shiota
24 *
25 */
26public class FlowPusher {
27 private FloodlightContext context;
28 private Thread thread;
29
30 private static class QueueState {
31 long last_sent_time = 0;
32 long last_sent_size = 0;
33 long max_rate;
34 }
35
36 private Map<IOFSwitch, Queue<OFMessage> > queues =
37 new HashMap<IOFSwitch, Queue<OFMessage> >();
38 private Map<Queue<OFMessage>, QueueState> queue_states =
39 new HashMap<Queue<OFMessage>, QueueState>();
40
41 private class FlowPusherProcess implements Runnable {
42 @Override
43 public void run() {
44 while (true) {
45 for (Map.Entry<IOFSwitch, Queue<OFMessage> > entry : queues.entrySet()) {
46 // pick one FlowEntry event from a queue
47 IOFSwitch sw = entry.getKey();
48 Queue<OFMessage> queue = entry.getValue();
49 if (entry == null || queue == null) {
50 continue;
51 }
52
53 OFMessage msg = queue.poll();
54 if (msg == null) {
55 continue;
56 }
57
58 QueueState state = queue_states.get(queue);
59 if (state == null) {
60 continue;
61 }
62
63 // check sending rate and determine it to be sent or not
64 long current_time = System.nanoTime();
65 long rate = state.last_sent_size / (current_time - state.last_sent_time);
66
67 // if need to send, call IOFSwitch#write()
68 if (rate < state.max_rate) {
69 try {
70 sw.write(msg, context);
71 state.last_sent_time = current_time;
72 state.last_sent_size = msg.getLengthU();
73 } catch (IOException e) {
74 // TODO Auto-generated catch block
75 e.printStackTrace();
76 }
77 }
78 }
79
80 // sleep while all queues are empty
81 boolean sleep = true;
82 do {
83 // TODO
84 } while (sleep);
85 }
86 }
87 }
88
89 public FlowPusher(FloodlightContext context) {
90 this.context = context;
91 }
92
93 public void assignQueue(IOFSwitch sw, Queue<OFMessage> queue) {
94 queues.put(sw, queue);
95 }
96
97 public void startProcess() {
98 thread = new Thread(new FlowPusherProcess());
99 thread.start();
100 }
101
102 public void stopProcess() {
103 // TODO tell thread to halt
104 }
105}