blob: 903295e048cc043ec813652ff4d5fa7ad80b54e2 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001package net.floodlightcontroller.packetstreamer;
2
3import net.floodlightcontroller.core.annotations.LogMessageCategory;
4import net.floodlightcontroller.core.annotations.LogMessageDoc;
5import net.floodlightcontroller.core.annotations.LogMessageDocs;
6import net.floodlightcontroller.packetstreamer.thrift.*;
7
8import java.nio.ByteBuffer;
9import java.util.ArrayList;
10import java.util.Map;
11import java.util.List;
12import java.util.concurrent.BlockingQueue;
13import java.util.concurrent.ConcurrentHashMap;
14import java.util.concurrent.LinkedBlockingQueue;
15
16import org.slf4j.Logger;
17import org.slf4j.LoggerFactory;
18
19/**
20 * The PacketStreamer handler class that implements the service APIs.
21 */
22@LogMessageCategory("OpenFlow Message Tracing")
23public class PacketStreamerHandler implements PacketStreamer.Iface {
24
25 /**
26 * The queue wrapper class that contains the queue for the streamed packets.
27 */
28 protected class SessionQueue {
29 protected BlockingQueue<ByteBuffer> pQueue;
30
31 /**
32 * The queue wrapper constructor
33 */
34 public SessionQueue() {
35 this.pQueue = new LinkedBlockingQueue<ByteBuffer>();
36 }
37
38 /**
39 * The access method to get to the internal queue.
40 */
41 public BlockingQueue<ByteBuffer> getQueue() {
42 return this.pQueue;
43 }
44 }
45
46 /**
47 * The class logger object
48 */
49 protected static Logger log =
50 LoggerFactory.getLogger(PacketStreamerServer.class);
51
52 /**
53 * A sessionId-to-queue mapping
54 */
55 protected Map<String, SessionQueue> msgQueues;
56
57 /**
58 * The handler's constructor
59 */
60 public PacketStreamerHandler() {
61 this.msgQueues = new ConcurrentHashMap<String, SessionQueue>();
62 }
63
64 /**
65 * The implementation for getPackets() function.
66 * This is a blocking API.
67 *
68 * @param sessionid
69 * @return A list of packets associated with the session
70 */
71 @Override
72 @LogMessageDocs({
73 @LogMessageDoc(level="ERROR",
74 message="Interrupted while waiting for session start",
75 explanation="The thread was interrupted waiting " +
76 "for the packet streamer session to start",
77 recommendation=LogMessageDoc.CHECK_CONTROLLER),
78 @LogMessageDoc(level="ERROR",
79 message="Interrupted while waiting for packets",
80 explanation="The thread was interrupted waiting " +
81 "for packets",
82 recommendation=LogMessageDoc.CHECK_CONTROLLER)
83 })
84 public List<ByteBuffer> getPackets(String sessionid)
85 throws org.apache.thrift.TException {
86 List<ByteBuffer> packets = new ArrayList<ByteBuffer>();
87 int count = 0;
88
89 while (!msgQueues.containsKey(sessionid) && count++ < 100) {
90 log.debug("Queue for session {} doesn't exist yet.", sessionid);
91 try {
92 Thread.sleep(100); // Wait 100 ms to check again.
93 } catch (InterruptedException e) {
94 log.error("Interrupted while waiting for session start");
95 }
96 }
97
98 if (count < 100) {
99 SessionQueue pQueue = msgQueues.get(sessionid);
100 BlockingQueue<ByteBuffer> queue = pQueue.getQueue();
101 // Block if queue is empty
102 try {
103 packets.add(queue.take());
104 queue.drainTo(packets);
105 } catch (InterruptedException e) {
106 log.error("Interrupted while waiting for packets");
107 }
108 }
109
110 return packets;
111 }
112
113 /**
114 * The implementation for pushMessageSync() function.
115 *
116 * @param msg
117 * @return 1 for success, 0 for failure
118 * @throws TException
119 */
120 @Override
121 @LogMessageDocs({
122 @LogMessageDoc(level="ERROR",
123 message="Could not push empty message",
124 explanation="An empty message was sent to the packet streamer",
125 recommendation=LogMessageDoc.REPORT_CONTROLLER_BUG),
126 @LogMessageDoc(level="ERROR",
127 message="queue for session {sessionId} is null",
128 explanation="The queue for the packet streamer session " +
129 "is missing",
130 recommendation=LogMessageDoc.REPORT_CONTROLLER_BUG)
131 })
132
133 public int pushMessageSync(Message msg)
134 throws org.apache.thrift.TException {
135
136 if (msg == null) {
137 log.error("Could not push empty message");
138 return 0;
139 }
140
141 List<String> sessionids = msg.getSessionIDs();
142 for (String sid : sessionids) {
143 SessionQueue pQueue = null;
144
145 if (!msgQueues.containsKey(sid)) {
146 pQueue = new SessionQueue();
147 msgQueues.put(sid, pQueue);
148 } else {
149 pQueue = msgQueues.get(sid);
150 }
151
152 log.debug("pushMessageSync: SessionId: " + sid +
153 " Receive a message, " + msg.toString() + "\n");
154 ByteBuffer bb = ByteBuffer.wrap(msg.getPacket().getData());
155 //ByteBuffer dst = ByteBuffer.wrap(msg.getPacket().toString().getBytes());
156 BlockingQueue<ByteBuffer> queue = pQueue.getQueue();
157 if (queue != null) {
158 if (!queue.offer(bb)) {
159 log.error("Failed to queue message for session: " + sid);
160 } else {
161 log.debug("insert a message to session: " + sid);
162 }
163 } else {
164 log.error("queue for session {} is null", sid);
165 }
166 }
167
168 return 1;
169 }
170
171 /**
172 * The implementation for pushMessageAsync() function.
173 *
174 * @param msg
175 * @throws TException
176 */
177 @Override
178 public void pushMessageAsync(Message msg)
179 throws org.apache.thrift.TException {
180 pushMessageSync(msg);
181 return;
182 }
183
184 /**
185 * The implementation for terminateSession() function.
186 * It removes the session to queue association.
187 * @param sessionid
188 * @throws TException
189 */
190 @Override
191 public void terminateSession(String sessionid)
192 throws org.apache.thrift.TException {
193 if (!msgQueues.containsKey(sessionid)) {
194 return;
195 }
196
197 SessionQueue pQueue = msgQueues.get(sessionid);
198
199 log.debug("terminateSession: SessionId: " + sessionid + "\n");
200 String data = "FilterTimeout";
201 ByteBuffer bb = ByteBuffer.wrap(data.getBytes());
202 BlockingQueue<ByteBuffer> queue = pQueue.getQueue();
203 if (queue != null) {
204 if (!queue.offer(bb)) {
205 log.error("Failed to queue message for session: " + sessionid);
206 }
207 msgQueues.remove(sessionid);
208 } else {
209 log.error("queue for session {} is null", sessionid);
210 }
211 }
212}
213