blob: 9d9ed6f94c30392b96b6bec7fd6e0636ec8dc3ee [file] [log] [blame]
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -07001package net.onrc.onos.apps.websocket;
2
3import net.onrc.onos.core.topology.ITopologyListener;
4import net.onrc.onos.core.topology.ITopologyService;
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -07005import net.onrc.onos.core.topology.TopologyEvents;
6
7import java.io.IOException;
8import java.nio.ByteBuffer;
9import java.util.concurrent.BlockingQueue;
10import java.util.concurrent.LinkedBlockingQueue;
11
12import javax.websocket.CloseReason;
13import javax.websocket.EndpointConfig;
14import javax.websocket.OnClose;
15import javax.websocket.OnError;
16import javax.websocket.OnMessage;
17import javax.websocket.OnOpen;
18import javax.websocket.PongMessage;
19import javax.websocket.Session;
20import javax.websocket.server.ServerEndpoint;
21
22import org.codehaus.jackson.map.ObjectMapper;
23import org.slf4j.Logger;
24import org.slf4j.LoggerFactory;
25
26/**
27 * The Topology WebSocket class. A single instance is allocated per client.
28 *
29 * <p>
30 * The object lifecycle is as follows:
31 * <p>
32 * <ol>
33 * <li> WebSocket Client opens a WebSocket connection to the corresponding
Yuta HIGUCHIccab05d2014-07-26 22:42:28 -070034 * {@literal @ServerEndpoint}: an instance for this class is created, and method
35 * {@literal @OnOpen} is called. </li>
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -070036 * <li> If the client sends a text message: method @OnMessage for String
37 * argument is called. </li>
38 * <li> If the client sends a binary message: method @OnMessage
39 * for ByteBuffer argument is called. </li>
40 * <li> If the client sends WebSocket Pong message: method @OnMessage
41 * for PongMessage is called. </li>
42 * <li> If the client closes the connection: method @OnClose is called. </li>
43 * <li> If there is any error with the connection: method @OnError is
44 * called. </li>
45 * </ol>
46 *<p>
47 * When the client opens the WebSocket, the server sends back the whole
48 * topology first. From that moment on, the server sends topology events
49 * (deltas) if there are any changes in the topology. Currently, all objects
50 * are encoded in JSON.
51 */
52@ServerEndpoint(value = "/topology")
53public class TopologyWebSocket extends Thread implements ITopologyListener {
54
55 private static final Logger log =
56 LoggerFactory.getLogger(TopologyWebSocket.class);
57 private BlockingQueue<TopologyEvents> topologyEventsQueue =
58 new LinkedBlockingQueue<>();
59 private Session socketSession;
60 private boolean isOpen = false;
61
62 /**
63 * Shutdown the socket.
64 */
65 private void shutdown() {
66 ITopologyService topologyService = WebSocketManager.topologyService;
Pavlin Radoslavov054cd592014-08-07 20:57:16 -070067 topologyService.removeListener(this);
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -070068 this.isOpen = false; // Stop the thread
69 }
70
71 /**
72 * Topology events have been generated.
73 *
74 * @param topologyEvents the generated Topology Events
75 * @see TopologyEvents
76 */
77 @Override
78 public void topologyEvents(TopologyEvents topologyEvents) {
79 // The topologyEvents object is a deep copy so we can add it as-is
80 this.topologyEventsQueue.add(topologyEvents);
81 }
82
83 /**
84 * Run the thread.
85 */
86 @Override
87 public void run() {
88 this.setName("TopologyWebSocket " + this.getId());
89 ObjectMapper mapper = new ObjectMapper();
90
91 //
92 // The main loop for sending events to the clients
93 //
94 while (this.isOpen && (!this.isInterrupted())) {
95 String eventsJson = null;
96 try {
97 TopologyEvents events = topologyEventsQueue.take();
98 eventsJson = mapper.writeValueAsString(events);
99 if (eventsJson != null) {
100 socketSession.getBasicRemote().sendText(eventsJson);
101 }
102 } catch (IOException e) {
103 log.debug("Exception sending TopologyWebSocket events: ", e);
104 } catch (Exception exception) {
105 log.debug("Exception processing TopologyWebSocket events: ",
106 exception);
107 }
108 }
109 }
110
111 /**
112 * Connection opened by a client.
113 *
114 * @param session the WebSocket session for the connection.
115 * @param conf the Endpoint configuration.
116 */
117 @OnOpen
118 public void onOpen(Session session, EndpointConfig conf) {
119 log.debug("WebSocket new session: {}", session.getId());
120 this.isOpen = true;
121
122 //
123 // Initialization and Topology Service registration
124 //
125 this.socketSession = session;
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -0700126 ITopologyService topologyService = WebSocketManager.topologyService;
Pavlin Radoslavov054cd592014-08-07 20:57:16 -0700127 topologyService.addListener(this, true);
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -0700128
129 // Start the thread
130 start();
131 }
132
133 /**
134 * Received a text message.
135 *
136 * @param session the WebSocket session for the connection.
137 * @param msg the received message.
138 */
139 @OnMessage
140 public void onTextMessage(Session session, String msg) {
141 log.debug("WebSocket Text message received: {}", msg);
142
143 // TODO: Sample code below for sending a response back
144 // NOTE: The transmission here must be synchronized by
145 // by the transmission by another thread within the run() method.
146 //
147 // String result = msg + " (from your server)";
148 // session.getBasicRemote().sendText(result);
149 //
150 // RemoteEndpoint.Basic basic = session.getBasicRemote();
151 // RemoteEndpoint.Async async = session.getAsyncRemote();
152 // session.getAsyncRemote().sendBinary(ByteBuffer data);
153 // session.getAsyncRemote().sendPing(ByteBuffer appData);
154 // session.getAsyncRemote().sendPong(ByteBuffer appData);
155 //
156 }
157
158 /**
159 * Received a binary message.
160 *
161 * @param session the WebSocket session for the connection.
162 * @param msg the received message.
163 */
164 @OnMessage
165 public void onBinaryMessage(Session session, ByteBuffer msg) {
166 log.debug("WebSocket Binary message received: {}", msg);
167 }
168
169 /**
170 * Received a Pong message.
171 *
172 * @param session the WebSocket session for the connection.
173 * @param msg the received message.
174 */
175 @OnMessage
176 public void onPongMessage(Session session, PongMessage msg) {
177 log.debug("WebSocket Pong message received: {}",
178 msg.getApplicationData());
179 }
180
181 /**
182 * Error occured on the connection.
183 *
184 * @param session the WebSocket session for the connection.
185 * @param error the occured error.
186 */
187 @OnError
188 public void onError(Session session, Throwable error) {
189 log.debug("WebSocket session error: ", error);
190 shutdown();
191 }
192
193 /**
194 * Connection closed.
195 *
196 * @param session the WebSocket session for the connection.
197 * @param reason the reason for closing the connection.
198 */
199 @OnClose
200 public void onClose(Session session, CloseReason reason) {
201 log.debug("WebSocket session closed: {}", reason);
202 shutdown();
203 }
204}