blob: 1a83b941ca6acc5a89bd2328983bb92d251621eb [file] [log] [blame]
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -07001package net.onrc.onos.apps.websocket;
2
3import net.onrc.onos.core.topology.ITopologyListener;
4import net.onrc.onos.core.topology.ITopologyService;
5import net.onrc.onos.core.topology.Topology;
6import net.onrc.onos.core.topology.TopologyEvents;
7
8import java.io.IOException;
9import java.nio.ByteBuffer;
10import java.util.concurrent.BlockingQueue;
11import java.util.concurrent.LinkedBlockingQueue;
12
13import javax.websocket.CloseReason;
14import javax.websocket.EndpointConfig;
15import javax.websocket.OnClose;
16import javax.websocket.OnError;
17import javax.websocket.OnMessage;
18import javax.websocket.OnOpen;
19import javax.websocket.PongMessage;
20import javax.websocket.Session;
21import javax.websocket.server.ServerEndpoint;
22
23import org.codehaus.jackson.map.ObjectMapper;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
26
27/**
28 * The Topology WebSocket class. A single instance is allocated per client.
29 *
30 * <p>
31 * The object lifecycle is as follows:
32 * <p>
33 * <ol>
34 * <li> WebSocket Client opens a WebSocket connection to the corresponding
Yuta HIGUCHIccab05d2014-07-26 22:42:28 -070035 * {@literal @ServerEndpoint}: an instance for this class is created, and method
36 * {@literal @OnOpen} is called. </li>
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -070037 * <li> If the client sends a text message: method @OnMessage for String
38 * argument is called. </li>
39 * <li> If the client sends a binary message: method @OnMessage
40 * for ByteBuffer argument is called. </li>
41 * <li> If the client sends WebSocket Pong message: method @OnMessage
42 * for PongMessage is called. </li>
43 * <li> If the client closes the connection: method @OnClose is called. </li>
44 * <li> If there is any error with the connection: method @OnError is
45 * called. </li>
46 * </ol>
47 *<p>
48 * When the client opens the WebSocket, the server sends back the whole
49 * topology first. From that moment on, the server sends topology events
50 * (deltas) if there are any changes in the topology. Currently, all objects
51 * are encoded in JSON.
52 */
53@ServerEndpoint(value = "/topology")
54public class TopologyWebSocket extends Thread implements ITopologyListener {
55
56 private static final Logger log =
57 LoggerFactory.getLogger(TopologyWebSocket.class);
58 private BlockingQueue<TopologyEvents> topologyEventsQueue =
59 new LinkedBlockingQueue<>();
60 private Session socketSession;
61 private boolean isOpen = false;
62
63 /**
64 * Shutdown the socket.
65 */
66 private void shutdown() {
67 ITopologyService topologyService = WebSocketManager.topologyService;
68 topologyService.deregisterTopologyListener(this);
69 this.isOpen = false; // Stop the thread
70 }
71
72 /**
73 * Topology events have been generated.
74 *
75 * @param topologyEvents the generated Topology Events
76 * @see TopologyEvents
77 */
78 @Override
79 public void topologyEvents(TopologyEvents topologyEvents) {
80 // The topologyEvents object is a deep copy so we can add it as-is
81 this.topologyEventsQueue.add(topologyEvents);
82 }
83
84 /**
85 * Run the thread.
86 */
87 @Override
88 public void run() {
89 this.setName("TopologyWebSocket " + this.getId());
90 ObjectMapper mapper = new ObjectMapper();
91
92 //
93 // The main loop for sending events to the clients
94 //
95 while (this.isOpen && (!this.isInterrupted())) {
96 String eventsJson = null;
97 try {
98 TopologyEvents events = topologyEventsQueue.take();
99 eventsJson = mapper.writeValueAsString(events);
100 if (eventsJson != null) {
101 socketSession.getBasicRemote().sendText(eventsJson);
102 }
103 } catch (IOException e) {
104 log.debug("Exception sending TopologyWebSocket events: ", e);
105 } catch (Exception exception) {
106 log.debug("Exception processing TopologyWebSocket events: ",
107 exception);
108 }
109 }
110 }
111
112 /**
113 * Connection opened by a client.
114 *
115 * @param session the WebSocket session for the connection.
116 * @param conf the Endpoint configuration.
117 */
118 @OnOpen
119 public void onOpen(Session session, EndpointConfig conf) {
120 log.debug("WebSocket new session: {}", session.getId());
121 this.isOpen = true;
122
123 //
124 // Initialization and Topology Service registration
125 //
126 this.socketSession = session;
127 ObjectMapper mapper = new ObjectMapper();
128 String topologyJson = null;
129 ITopologyService topologyService = WebSocketManager.topologyService;
130 topologyService.registerTopologyListener(this);
131
132 //
133 // Get the initial topology and encode it in JSON
134 //
135 Topology topology = topologyService.getTopology();
136 topology.acquireReadLock();
137 try {
138 topologyJson = mapper.writeValueAsString(topology);
139 } catch (IOException e) {
140 log.debug("Exception encoding topology as JSON: ", e);
141 } finally {
142 topology.releaseReadLock();
143 }
144
145 //
146 // Send the initial topology
147 //
148 if (topologyJson != null) {
149 try {
150 session.getBasicRemote().sendText(topologyJson);
151 } catch (IOException e) {
152 log.debug("Exception sending TopologyWebSocket topology: ", e);
153 }
154 }
155
156 // Start the thread
157 start();
158 }
159
160 /**
161 * Received a text message.
162 *
163 * @param session the WebSocket session for the connection.
164 * @param msg the received message.
165 */
166 @OnMessage
167 public void onTextMessage(Session session, String msg) {
168 log.debug("WebSocket Text message received: {}", msg);
169
170 // TODO: Sample code below for sending a response back
171 // NOTE: The transmission here must be synchronized by
172 // by the transmission by another thread within the run() method.
173 //
174 // String result = msg + " (from your server)";
175 // session.getBasicRemote().sendText(result);
176 //
177 // RemoteEndpoint.Basic basic = session.getBasicRemote();
178 // RemoteEndpoint.Async async = session.getAsyncRemote();
179 // session.getAsyncRemote().sendBinary(ByteBuffer data);
180 // session.getAsyncRemote().sendPing(ByteBuffer appData);
181 // session.getAsyncRemote().sendPong(ByteBuffer appData);
182 //
183 }
184
185 /**
186 * Received a binary message.
187 *
188 * @param session the WebSocket session for the connection.
189 * @param msg the received message.
190 */
191 @OnMessage
192 public void onBinaryMessage(Session session, ByteBuffer msg) {
193 log.debug("WebSocket Binary message received: {}", msg);
194 }
195
196 /**
197 * Received a Pong message.
198 *
199 * @param session the WebSocket session for the connection.
200 * @param msg the received message.
201 */
202 @OnMessage
203 public void onPongMessage(Session session, PongMessage msg) {
204 log.debug("WebSocket Pong message received: {}",
205 msg.getApplicationData());
206 }
207
208 /**
209 * Error occured on the connection.
210 *
211 * @param session the WebSocket session for the connection.
212 * @param error the occured error.
213 */
214 @OnError
215 public void onError(Session session, Throwable error) {
216 log.debug("WebSocket session error: ", error);
217 shutdown();
218 }
219
220 /**
221 * Connection closed.
222 *
223 * @param session the WebSocket session for the connection.
224 * @param reason the reason for closing the connection.
225 */
226 @OnClose
227 public void onClose(Session session, CloseReason reason) {
228 log.debug("WebSocket session closed: {}", reason);
229 shutdown();
230 }
231}