blob: fe365d9885550f1c77ef4b4ddac3fcea98448e36 [file] [log] [blame]
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -07001package net.onrc.onos.apps.websocket;
2
3import net.onrc.onos.core.topology.ITopologyListener;
4import net.onrc.onos.core.topology.ITopologyService;
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -07005import net.onrc.onos.core.topology.TopologyEvents;
6
7import java.io.IOException;
8import java.nio.ByteBuffer;
Pavlin Radoslavovb5999042014-08-26 16:39:35 -07009import java.nio.charset.StandardCharsets;
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -070010import java.util.concurrent.BlockingQueue;
11import java.util.concurrent.LinkedBlockingQueue;
Pavlin Radoslavovb5999042014-08-26 16:39:35 -070012import java.util.concurrent.TimeUnit;
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -070013
14import javax.websocket.CloseReason;
15import javax.websocket.EndpointConfig;
16import javax.websocket.OnClose;
17import javax.websocket.OnError;
18import javax.websocket.OnMessage;
19import javax.websocket.OnOpen;
20import javax.websocket.PongMessage;
21import javax.websocket.Session;
22import javax.websocket.server.ServerEndpoint;
23
24import org.codehaus.jackson.map.ObjectMapper;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27
28/**
29 * The Topology WebSocket class. A single instance is allocated per client.
30 *
31 * <p>
32 * The object lifecycle is as follows:
33 * <p>
34 * <ol>
35 * <li> WebSocket Client opens a WebSocket connection to the corresponding
Yuta HIGUCHIccab05d2014-07-26 22:42:28 -070036 * {@literal @ServerEndpoint}: an instance for this class is created, and method
37 * {@literal @OnOpen} is called. </li>
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -070038 * <li> If the client sends a text message: method @OnMessage for String
39 * argument is called. </li>
40 * <li> If the client sends a binary message: method @OnMessage
41 * for ByteBuffer argument is called. </li>
42 * <li> If the client sends WebSocket Pong message: method @OnMessage
43 * for PongMessage is called. </li>
44 * <li> If the client closes the connection: method @OnClose is called. </li>
45 * <li> If there is any error with the connection: method @OnError is
46 * called. </li>
47 * </ol>
48 *<p>
49 * When the client opens the WebSocket, the server sends back the whole
50 * topology first. From that moment on, the server sends topology events
51 * (deltas) if there are any changes in the topology. Currently, all objects
52 * are encoded in JSON.
53 */
54@ServerEndpoint(value = "/topology")
55public class TopologyWebSocket extends Thread implements ITopologyListener {
56
57 private static final Logger log =
58 LoggerFactory.getLogger(TopologyWebSocket.class);
59 private BlockingQueue<TopologyEvents> topologyEventsQueue =
60 new LinkedBlockingQueue<>();
61 private Session socketSession;
62 private boolean isOpen = false;
Pavlin Radoslavovb5999042014-08-26 16:39:35 -070063 // Ping-related state
64 private static final int PING_INTERVAL_SEC = 30; // Ping every 30 secs
65 private static final int MAX_MISSING_PING = 3;
66 private int missingPing = 0; // Pings to the client without a pong
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -070067
68 /**
69 * Shutdown the socket.
70 */
71 private void shutdown() {
72 ITopologyService topologyService = WebSocketManager.topologyService;
Pavlin Radoslavov054cd592014-08-07 20:57:16 -070073 topologyService.removeListener(this);
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -070074 this.isOpen = false; // Stop the thread
75 }
76
77 /**
78 * Topology events have been generated.
79 *
80 * @param topologyEvents the generated Topology Events
81 * @see TopologyEvents
82 */
83 @Override
84 public void topologyEvents(TopologyEvents topologyEvents) {
Pavlin Radoslavovb5999042014-08-26 16:39:35 -070085 // The topologyEvents object is immutable, so we can add it as-is
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -070086 this.topologyEventsQueue.add(topologyEvents);
87 }
88
89 /**
90 * Run the thread.
91 */
92 @Override
93 public void run() {
94 this.setName("TopologyWebSocket " + this.getId());
95 ObjectMapper mapper = new ObjectMapper();
96
97 //
Pavlin Radoslavovb5999042014-08-26 16:39:35 -070098 // The main loop for sending events to the clients.
99 //
100 // If there are no events, we send periodic PING messages to discover
101 // unreachable clients.
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -0700102 //
103 while (this.isOpen && (!this.isInterrupted())) {
104 String eventsJson = null;
105 try {
Pavlin Radoslavovb5999042014-08-26 16:39:35 -0700106 TopologyEvents events =
107 topologyEventsQueue.poll(PING_INTERVAL_SEC,
108 TimeUnit.SECONDS);
109 if (events != null) {
110 // Send the event
111 eventsJson = mapper.writeValueAsString(events);
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -0700112 socketSession.getBasicRemote().sendText(eventsJson);
Pavlin Radoslavovb5999042014-08-26 16:39:35 -0700113 continue;
114 }
115 // Send a PING message
116 missingPing++;
117 if (missingPing > MAX_MISSING_PING) {
118 // Timeout
119 log.debug("WebSocket session timeout");
120 shutdown();
121 } else {
122 String msg = "PING(TopologyWebsocket)";
123 ByteBuffer pingBuffer =
124 ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
125 socketSession.getBasicRemote().sendPing(pingBuffer);
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -0700126 }
127 } catch (IOException e) {
128 log.debug("Exception sending TopologyWebSocket events: ", e);
Pavlin Radoslavovb5999042014-08-26 16:39:35 -0700129 } catch (InterruptedException e) {
130 log.debug("TopologyWebSocket interrupted while waiting: ", e);
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -0700131 } catch (Exception exception) {
132 log.debug("Exception processing TopologyWebSocket events: ",
133 exception);
134 }
135 }
136 }
137
138 /**
139 * Connection opened by a client.
140 *
141 * @param session the WebSocket session for the connection.
142 * @param conf the Endpoint configuration.
143 */
144 @OnOpen
145 public void onOpen(Session session, EndpointConfig conf) {
146 log.debug("WebSocket new session: {}", session.getId());
147 this.isOpen = true;
148
149 //
150 // Initialization and Topology Service registration
151 //
152 this.socketSession = session;
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -0700153 ITopologyService topologyService = WebSocketManager.topologyService;
Pavlin Radoslavov054cd592014-08-07 20:57:16 -0700154 topologyService.addListener(this, true);
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -0700155
156 // Start the thread
157 start();
158 }
159
160 /**
161 * Received a text message.
162 *
163 * @param session the WebSocket session for the connection.
164 * @param msg the received message.
165 */
166 @OnMessage
167 public void onTextMessage(Session session, String msg) {
168 log.debug("WebSocket Text message received: {}", msg);
169
170 // TODO: Sample code below for sending a response back
171 // NOTE: The transmission here must be synchronized by
172 // by the transmission by another thread within the run() method.
173 //
174 // String result = msg + " (from your server)";
175 // session.getBasicRemote().sendText(result);
176 //
177 // RemoteEndpoint.Basic basic = session.getBasicRemote();
178 // RemoteEndpoint.Async async = session.getAsyncRemote();
179 // session.getAsyncRemote().sendBinary(ByteBuffer data);
180 // session.getAsyncRemote().sendPing(ByteBuffer appData);
181 // session.getAsyncRemote().sendPong(ByteBuffer appData);
182 //
183 }
184
185 /**
186 * Received a binary message.
187 *
188 * @param session the WebSocket session for the connection.
189 * @param msg the received message.
190 */
191 @OnMessage
192 public void onBinaryMessage(Session session, ByteBuffer msg) {
193 log.debug("WebSocket Binary message received: {}", msg);
194 }
195
196 /**
197 * Received a Pong message.
198 *
199 * @param session the WebSocket session for the connection.
200 * @param msg the received message.
201 */
202 @OnMessage
203 public void onPongMessage(Session session, PongMessage msg) {
Pavlin Radoslavovb5999042014-08-26 16:39:35 -0700204 log.trace("WebSocket Pong message received for session: {}",
205 session.getId());
206 missingPing = 0;
Pavlin Radoslavovde4c3892014-07-06 23:19:59 -0700207 }
208
209 /**
210 * Error occured on the connection.
211 *
212 * @param session the WebSocket session for the connection.
213 * @param error the occured error.
214 */
215 @OnError
216 public void onError(Session session, Throwable error) {
217 log.debug("WebSocket session error: ", error);
218 shutdown();
219 }
220
221 /**
222 * Connection closed.
223 *
224 * @param session the WebSocket session for the connection.
225 * @param reason the reason for closing the connection.
226 */
227 @OnClose
228 public void onClose(Session session, CloseReason reason) {
229 log.debug("WebSocket session closed: {}", reason);
230 shutdown();
231 }
232}