Umesh Krishnaswamy | 345ee99 | 2012-12-13 20:29:48 -0800 | [diff] [blame] | 1 | package net.floodlightcontroller.packetstreamer; |
| 2 | |
| 3 | import net.floodlightcontroller.core.annotations.LogMessageCategory; |
| 4 | import net.floodlightcontroller.core.annotations.LogMessageDoc; |
| 5 | import net.floodlightcontroller.core.annotations.LogMessageDocs; |
| 6 | import net.floodlightcontroller.packetstreamer.thrift.*; |
| 7 | |
| 8 | import java.nio.ByteBuffer; |
| 9 | import java.util.ArrayList; |
| 10 | import java.util.Map; |
| 11 | import java.util.List; |
| 12 | import java.util.concurrent.BlockingQueue; |
| 13 | import java.util.concurrent.ConcurrentHashMap; |
| 14 | import java.util.concurrent.LinkedBlockingQueue; |
| 15 | |
| 16 | import org.slf4j.Logger; |
| 17 | import org.slf4j.LoggerFactory; |
| 18 | |
| 19 | /** |
| 20 | * The PacketStreamer handler class that implements the service APIs. |
| 21 | */ |
| 22 | @LogMessageCategory("OpenFlow Message Tracing") |
| 23 | public class PacketStreamerHandler implements PacketStreamer.Iface { |
| 24 | |
| 25 | /** |
| 26 | * The queue wrapper class that contains the queue for the streamed packets. |
| 27 | */ |
| 28 | protected class SessionQueue { |
| 29 | protected BlockingQueue<ByteBuffer> pQueue; |
| 30 | |
| 31 | /** |
| 32 | * The queue wrapper constructor |
| 33 | */ |
| 34 | public SessionQueue() { |
| 35 | this.pQueue = new LinkedBlockingQueue<ByteBuffer>(); |
| 36 | } |
| 37 | |
| 38 | /** |
| 39 | * The access method to get to the internal queue. |
| 40 | */ |
| 41 | public BlockingQueue<ByteBuffer> getQueue() { |
| 42 | return this.pQueue; |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | /** |
| 47 | * The class logger object |
| 48 | */ |
| 49 | protected static Logger log = |
| 50 | LoggerFactory.getLogger(PacketStreamerServer.class); |
| 51 | |
| 52 | /** |
| 53 | * A sessionId-to-queue mapping |
| 54 | */ |
| 55 | protected Map<String, SessionQueue> msgQueues; |
| 56 | |
| 57 | /** |
| 58 | * The handler's constructor |
| 59 | */ |
| 60 | public PacketStreamerHandler() { |
| 61 | this.msgQueues = new ConcurrentHashMap<String, SessionQueue>(); |
| 62 | } |
| 63 | |
| 64 | /** |
| 65 | * The implementation for getPackets() function. |
| 66 | * This is a blocking API. |
| 67 | * |
| 68 | * @param sessionid |
| 69 | * @return A list of packets associated with the session |
| 70 | */ |
| 71 | @Override |
| 72 | @LogMessageDocs({ |
| 73 | @LogMessageDoc(level="ERROR", |
| 74 | message="Interrupted while waiting for session start", |
| 75 | explanation="The thread was interrupted waiting " + |
| 76 | "for the packet streamer session to start", |
| 77 | recommendation=LogMessageDoc.CHECK_CONTROLLER), |
| 78 | @LogMessageDoc(level="ERROR", |
| 79 | message="Interrupted while waiting for packets", |
| 80 | explanation="The thread was interrupted waiting " + |
| 81 | "for packets", |
| 82 | recommendation=LogMessageDoc.CHECK_CONTROLLER) |
| 83 | }) |
| 84 | public List<ByteBuffer> getPackets(String sessionid) |
| 85 | throws org.apache.thrift.TException { |
| 86 | List<ByteBuffer> packets = new ArrayList<ByteBuffer>(); |
| 87 | int count = 0; |
| 88 | |
| 89 | while (!msgQueues.containsKey(sessionid) && count++ < 100) { |
| 90 | log.debug("Queue for session {} doesn't exist yet.", sessionid); |
| 91 | try { |
| 92 | Thread.sleep(100); // Wait 100 ms to check again. |
| 93 | } catch (InterruptedException e) { |
| 94 | log.error("Interrupted while waiting for session start"); |
| 95 | } |
| 96 | } |
| 97 | |
| 98 | if (count < 100) { |
| 99 | SessionQueue pQueue = msgQueues.get(sessionid); |
| 100 | BlockingQueue<ByteBuffer> queue = pQueue.getQueue(); |
| 101 | // Block if queue is empty |
| 102 | try { |
| 103 | packets.add(queue.take()); |
| 104 | queue.drainTo(packets); |
| 105 | } catch (InterruptedException e) { |
| 106 | log.error("Interrupted while waiting for packets"); |
| 107 | } |
| 108 | } |
| 109 | |
| 110 | return packets; |
| 111 | } |
| 112 | |
| 113 | /** |
| 114 | * The implementation for pushMessageSync() function. |
| 115 | * |
| 116 | * @param msg |
| 117 | * @return 1 for success, 0 for failure |
| 118 | * @throws TException |
| 119 | */ |
| 120 | @Override |
| 121 | @LogMessageDocs({ |
| 122 | @LogMessageDoc(level="ERROR", |
| 123 | message="Could not push empty message", |
| 124 | explanation="An empty message was sent to the packet streamer", |
| 125 | recommendation=LogMessageDoc.REPORT_CONTROLLER_BUG), |
| 126 | @LogMessageDoc(level="ERROR", |
| 127 | message="queue for session {sessionId} is null", |
| 128 | explanation="The queue for the packet streamer session " + |
| 129 | "is missing", |
| 130 | recommendation=LogMessageDoc.REPORT_CONTROLLER_BUG) |
| 131 | }) |
| 132 | |
| 133 | public int pushMessageSync(Message msg) |
| 134 | throws org.apache.thrift.TException { |
| 135 | |
| 136 | if (msg == null) { |
| 137 | log.error("Could not push empty message"); |
| 138 | return 0; |
| 139 | } |
| 140 | |
| 141 | List<String> sessionids = msg.getSessionIDs(); |
| 142 | for (String sid : sessionids) { |
| 143 | SessionQueue pQueue = null; |
| 144 | |
| 145 | if (!msgQueues.containsKey(sid)) { |
| 146 | pQueue = new SessionQueue(); |
| 147 | msgQueues.put(sid, pQueue); |
| 148 | } else { |
| 149 | pQueue = msgQueues.get(sid); |
| 150 | } |
| 151 | |
| 152 | log.debug("pushMessageSync: SessionId: " + sid + |
| 153 | " Receive a message, " + msg.toString() + "\n"); |
| 154 | ByteBuffer bb = ByteBuffer.wrap(msg.getPacket().getData()); |
| 155 | //ByteBuffer dst = ByteBuffer.wrap(msg.getPacket().toString().getBytes()); |
| 156 | BlockingQueue<ByteBuffer> queue = pQueue.getQueue(); |
| 157 | if (queue != null) { |
| 158 | if (!queue.offer(bb)) { |
| 159 | log.error("Failed to queue message for session: " + sid); |
| 160 | } else { |
| 161 | log.debug("insert a message to session: " + sid); |
| 162 | } |
| 163 | } else { |
| 164 | log.error("queue for session {} is null", sid); |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | return 1; |
| 169 | } |
| 170 | |
| 171 | /** |
| 172 | * The implementation for pushMessageAsync() function. |
| 173 | * |
| 174 | * @param msg |
| 175 | * @throws TException |
| 176 | */ |
| 177 | @Override |
| 178 | public void pushMessageAsync(Message msg) |
| 179 | throws org.apache.thrift.TException { |
| 180 | pushMessageSync(msg); |
| 181 | return; |
| 182 | } |
| 183 | |
| 184 | /** |
| 185 | * The implementation for terminateSession() function. |
| 186 | * It removes the session to queue association. |
| 187 | * @param sessionid |
| 188 | * @throws TException |
| 189 | */ |
| 190 | @Override |
| 191 | public void terminateSession(String sessionid) |
| 192 | throws org.apache.thrift.TException { |
| 193 | if (!msgQueues.containsKey(sessionid)) { |
| 194 | return; |
| 195 | } |
| 196 | |
| 197 | SessionQueue pQueue = msgQueues.get(sessionid); |
| 198 | |
| 199 | log.debug("terminateSession: SessionId: " + sessionid + "\n"); |
| 200 | String data = "FilterTimeout"; |
| 201 | ByteBuffer bb = ByteBuffer.wrap(data.getBytes()); |
| 202 | BlockingQueue<ByteBuffer> queue = pQueue.getQueue(); |
| 203 | if (queue != null) { |
| 204 | if (!queue.offer(bb)) { |
| 205 | log.error("Failed to queue message for session: " + sessionid); |
| 206 | } |
| 207 | msgQueues.remove(sessionid); |
| 208 | } else { |
| 209 | log.error("queue for session {} is null", sessionid); |
| 210 | } |
| 211 | } |
| 212 | } |
| 213 | |