Added Topology WebSocket server implementation.
It allows remote applications (e.g., the GUI) to subscribe
for Topology-related events.
The URL is:
ws://hostname:8081/ws/onos/topology

When a new client opens a socket, the server transmits the whole topology.
From that moment on, the server sends topology events (deltas) if there
are any changes in the topology. Currently, all objects are encoded in JSON.

The default WebSocket port number is set to 8081
(configurable in conf/onos.properties)

NOTE: In the current implementation, the initial fetching of the topology,
and adding topology events for transmission are done by two different threads.
Hence, when a new client opens a socket, there is a small window of time
when some of the added events are "moving back in time".
The final result of applying all events will be technically correct, but
those "moving back in time" events are semantically incorrect.
This issue will be addressed in later iterations / refactoring of the
implementation.

Change-Id: Id2cbf72d1384208201d916e0aa1e5926659878ee
diff --git a/conf/onos.properties b/conf/onos.properties
index a19cee5..e2b0bdc 100644
--- a/conf/onos.properties
+++ b/conf/onos.properties
@@ -6,12 +6,14 @@
 net.onrc.onos.core.intent.runtime.PathCalcRuntimeModule,\
 net.onrc.onos.core.intent.runtime.PlanInstallModule,\
 net.onrc.onos.core.registry.ZookeeperRegistry, \
-net.onrc.onos.core.metrics.OnosMetricsModule
+net.onrc.onos.core.metrics.OnosMetricsModule, \
+net.onrc.onos.apps.websocket.WebSocketModule
 net.floodlightcontroller.restserver.RestApiServer.port = 8080
 net.floodlightcontroller.core.FloodlightProvider.openflowport = 6633
 net.floodlightcontroller.core.FloodlightProvider.workerthreads = 16
 net.floodlightcontroller.forwarding.Forwarding.idletimeout = 5
 net.floodlightcontroller.forwarding.Forwarding.hardtimeout = 0
+net.onrc.onos.apps.websocket.WebSocketModule.port = 8081
 # NOTE: Do NOT modify or remove the line below. This value will be overwritten by onos.sh script.
 net.onrc.onos.core.datagrid.HazelcastDatagrid.datagridConfig = 
 # Uncomment and list all the ZooKeeper instances after localhost on multi-instance deployment.
diff --git a/pom.xml b/pom.xml
index 61f8a8c..2c5762a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -506,12 +506,32 @@
       <version>${hazelcast.version}</version>
     </dependency>
     <dependency>
+      <groupId>javax.websocket</groupId>
+      <artifactId>javax.websocket-api</artifactId>
+      <version>1.0</version>
+    </dependency>
+    <dependency>
       <groupId>net.sf.json-lib</groupId>
       <artifactId>json-lib</artifactId>
       <version>2.4</version>
       <classifier>jdk15</classifier>
     </dependency>
     <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-server</artifactId>
+      <version>9.2.1.v20140609</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty</groupId>
+      <artifactId>jetty-servlet</artifactId>
+      <version>9.2.1.v20140609</version>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty.websocket</groupId>
+      <artifactId>javax-websocket-server-impl</artifactId>
+      <version>9.2.1.v20140609</version>
+    </dependency>
+    <dependency>
       <groupId>org.restlet.jse</groupId>
       <artifactId>org.restlet</artifactId>
       <version>${restlet.version}</version>
diff --git a/src/main/java/net/onrc/onos/apps/websocket/IWebSocketService.java b/src/main/java/net/onrc/onos/apps/websocket/IWebSocketService.java
new file mode 100644
index 0000000..69c85ac
--- /dev/null
+++ b/src/main/java/net/onrc/onos/apps/websocket/IWebSocketService.java
@@ -0,0 +1,9 @@
+package net.onrc.onos.apps.websocket;
+
+import net.floodlightcontroller.core.module.IFloodlightService;
+
+/**
+ * Interface for providing the WebSocket service to other modules.
+ */
+public interface IWebSocketService extends IFloodlightService {
+}
diff --git a/src/main/java/net/onrc/onos/apps/websocket/TopologyWebSocket.java b/src/main/java/net/onrc/onos/apps/websocket/TopologyWebSocket.java
new file mode 100644
index 0000000..f706944
--- /dev/null
+++ b/src/main/java/net/onrc/onos/apps/websocket/TopologyWebSocket.java
@@ -0,0 +1,231 @@
+package net.onrc.onos.apps.websocket;
+
+import net.onrc.onos.core.topology.ITopologyListener;
+import net.onrc.onos.core.topology.ITopologyService;
+import net.onrc.onos.core.topology.Topology;
+import net.onrc.onos.core.topology.TopologyEvents;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.websocket.CloseReason;
+import javax.websocket.EndpointConfig;
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.PongMessage;
+import javax.websocket.Session;
+import javax.websocket.server.ServerEndpoint;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The Topology WebSocket class. A single instance is allocated per client.
+ *
+ * <p>
+ * The object lifecycle is as follows:
+ * <p>
+ * <ol>
+ * <li> WebSocket Client opens a WebSocket connection to the corresponding
+ *    @ServerEndpoint: an instance for this class is created, and method
+ *    @OnOpen is called. </li>
+ * <li> If the client sends a text message: method @OnMessage for String
+ *    argument is called. </li>
+ * <li> If the client sends a binary message: method @OnMessage
+ *    for ByteBuffer argument is called. </li>
+ * <li> If the client sends WebSocket Pong message: method @OnMessage
+ *    for PongMessage is called. </li>
+ * <li> If the client closes the connection: method @OnClose is called. </li>
+ * <li> If there is any error with the connection: method @OnError is
+ *    called. </li>
+ * </ol>
+ *<p>
+ * When the client opens the WebSocket, the server sends back the whole
+ * topology first. From that moment on, the server sends topology events
+ * (deltas) if there are any changes in the topology. Currently, all objects
+ * are encoded in JSON.
+ */
+@ServerEndpoint(value = "/topology")
+public class TopologyWebSocket extends Thread implements ITopologyListener {
+
+    private static final Logger log =
+        LoggerFactory.getLogger(TopologyWebSocket.class);
+    private BlockingQueue<TopologyEvents> topologyEventsQueue =
+        new LinkedBlockingQueue<>();
+    private Session socketSession;
+    private boolean isOpen = false;
+
+    /**
+     * Shutdown the socket.
+     */
+    private void shutdown() {
+        ITopologyService topologyService = WebSocketManager.topologyService;
+        topologyService.deregisterTopologyListener(this);
+        this.isOpen = false;            // Stop the thread
+    }
+
+    /**
+     * Topology events have been generated.
+     *
+     * @param topologyEvents the generated Topology Events
+     * @see TopologyEvents
+     */
+    @Override
+    public void topologyEvents(TopologyEvents topologyEvents) {
+        // The topologyEvents object is a deep copy so we can add it as-is
+        this.topologyEventsQueue.add(topologyEvents);
+    }
+
+    /**
+     * Run the thread.
+     */
+    @Override
+    public void run() {
+        this.setName("TopologyWebSocket " + this.getId());
+        ObjectMapper mapper = new ObjectMapper();
+
+        //
+        // The main loop for sending events to the clients
+        //
+        while (this.isOpen && (!this.isInterrupted())) {
+            String eventsJson = null;
+            try {
+                TopologyEvents events = topologyEventsQueue.take();
+                eventsJson = mapper.writeValueAsString(events);
+                if (eventsJson != null) {
+                    socketSession.getBasicRemote().sendText(eventsJson);
+                }
+            } catch (IOException e) {
+                log.debug("Exception sending TopologyWebSocket events: ", e);
+            } catch (Exception exception) {
+                log.debug("Exception processing TopologyWebSocket events: ",
+                          exception);
+            }
+        }
+    }
+
+    /**
+     * Connection opened by a client.
+     *
+     * @param session the WebSocket session for the connection.
+     * @param conf the Endpoint configuration.
+     */
+    @OnOpen
+    public void onOpen(Session session, EndpointConfig conf) {
+        log.debug("WebSocket new session: {}", session.getId());
+        this.isOpen = true;
+
+        //
+        // Initialization and Topology Service registration
+        //
+        this.socketSession = session;
+        ObjectMapper mapper = new ObjectMapper();
+        String topologyJson = null;
+        ITopologyService topologyService = WebSocketManager.topologyService;
+        topologyService.registerTopologyListener(this);
+
+        //
+        // Get the initial topology and encode it in JSON
+        //
+        Topology topology = topologyService.getTopology();
+        topology.acquireReadLock();
+        try {
+            topologyJson = mapper.writeValueAsString(topology);
+        } catch (IOException e) {
+            log.debug("Exception encoding topology as JSON: ", e);
+        } finally {
+            topology.releaseReadLock();
+        }
+
+        //
+        // Send the initial topology
+        //
+        if (topologyJson != null) {
+            try {
+                session.getBasicRemote().sendText(topologyJson);
+            } catch (IOException e) {
+                log.debug("Exception sending TopologyWebSocket topology: ", e);
+            }
+        }
+
+        // Start the thread
+        start();
+    }
+
+    /**
+     * Received a text message.
+     *
+     * @param session the WebSocket session for the connection.
+     * @param msg the received message.
+     */
+    @OnMessage
+    public void onTextMessage(Session session, String msg) {
+        log.debug("WebSocket Text message received: {}", msg);
+
+        // TODO: Sample code below for sending a response back
+        // NOTE: The transmission here must be synchronized by
+        // by the transmission by another thread within the run() method.
+        //
+        // String result = msg + " (from your server)";
+        // session.getBasicRemote().sendText(result);
+        //
+        // RemoteEndpoint.Basic basic = session.getBasicRemote();
+        // RemoteEndpoint.Async async = session.getAsyncRemote();
+        // session.getAsyncRemote().sendBinary(ByteBuffer data);
+        // session.getAsyncRemote().sendPing(ByteBuffer appData);
+        // session.getAsyncRemote().sendPong(ByteBuffer appData);
+        //
+    }
+
+    /**
+     * Received a binary message.
+     *
+     * @param session the WebSocket session for the connection.
+     * @param msg the received message.
+     */
+    @OnMessage
+    public void onBinaryMessage(Session session, ByteBuffer msg) {
+        log.debug("WebSocket Binary message received: {}", msg);
+    }
+
+    /**
+     * Received a Pong message.
+     *
+     * @param session the WebSocket session for the connection.
+     * @param msg the received message.
+     */
+    @OnMessage
+    public void onPongMessage(Session session, PongMessage msg) {
+        log.debug("WebSocket Pong message received: {}",
+                  msg.getApplicationData());
+    }
+
+    /**
+     * Error occured on the connection.
+     *
+     * @param session the WebSocket session for the connection.
+     * @param error the occured error.
+     */
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.debug("WebSocket session error: ", error);
+        shutdown();
+    }
+
+    /**
+     * Connection closed.
+     *
+     * @param session the WebSocket session for the connection.
+     * @param reason the reason for closing the connection.
+     */
+    @OnClose
+    public void onClose(Session session, CloseReason reason) {
+        log.debug("WebSocket session closed: {}", reason);
+        shutdown();
+    }
+}
diff --git a/src/main/java/net/onrc/onos/apps/websocket/WebSocketManager.java b/src/main/java/net/onrc/onos/apps/websocket/WebSocketManager.java
new file mode 100644
index 0000000..9cddfae
--- /dev/null
+++ b/src/main/java/net/onrc/onos/apps/websocket/WebSocketManager.java
@@ -0,0 +1,100 @@
+package net.onrc.onos.apps.websocket;
+
+import javax.websocket.DeploymentException;
+import javax.websocket.server.ServerContainer;
+
+import net.onrc.onos.core.topology.ITopologyService;
+
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.websocket.jsr356.server.deploy.WebSocketServerContainerInitializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+/**
+ * The WebSocket manager class.
+ * There is a single instance for all WebSocket endpoints.
+ */
+class WebSocketManager {
+    protected static ITopologyService topologyService;
+
+    private static final Logger log =
+        LoggerFactory.getLogger(WebSocketManager.class);
+    private int webSocketPort;
+    private JettyServer jettyServer;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService the Topology Service to use.
+     * @param webSocketPort the WebSocket port to use.
+     */
+    @SuppressFBWarnings(value = "ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD",
+                        justification = "The writing to WebSocketManager.topologyService happens only once on startup")
+    WebSocketManager(ITopologyService topologyService,
+                     int webSocketPort) {
+        WebSocketManager.topologyService = topologyService;
+        this.webSocketPort = webSocketPort;
+    }
+
+    /**
+     * Startup processing.
+     */
+    void startup() {
+        log.debug("Starting WebSocket server on port {}", webSocketPort);
+
+        jettyServer = new JettyServer(webSocketPort);
+        this.jettyServer.start();
+    }
+
+    /**
+     * Class for creating the WebSocket server and associated state.
+     */
+    static class JettyServer extends Thread {
+        private Server server;
+        private ServletContextHandler context;
+        private ServerContainer container;
+
+        /**
+         * Constructor.
+         *
+         * @param port the port to listen on.
+         */
+        JettyServer(final int port) {
+            server = new Server(port);
+
+            // Initialize the context handler
+            context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+            context.setContextPath("/ws/onos");
+            server.setHandler(context);
+
+            // Initialize the WebSocket layer
+            container =
+                WebSocketServerContainerInitializer.configureContext(context);
+            try {
+                container.addEndpoint(TopologyWebSocket.class);
+            } catch (DeploymentException e) {
+                log.debug("Exception adding WebSocket endpoint: ", e);
+            }
+        }
+
+        /**
+         * Run the thread.
+         */
+        @Override
+        public void run() {
+            try {
+                this.server.start();
+            } catch (final Exception e) {
+                log.debug("Exception starting the WebSocket server: ", e);
+            }
+            try {
+                this.server.join();
+            } catch (final InterruptedException e) {
+                log.debug("Exception joining the WebSocket server: ", e);
+            }
+        }
+    }
+}
diff --git a/src/main/java/net/onrc/onos/apps/websocket/WebSocketModule.java b/src/main/java/net/onrc/onos/apps/websocket/WebSocketModule.java
new file mode 100644
index 0000000..d50458b
--- /dev/null
+++ b/src/main/java/net/onrc/onos/apps/websocket/WebSocketModule.java
@@ -0,0 +1,72 @@
+package net.onrc.onos.apps.websocket;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import net.floodlightcontroller.core.module.FloodlightModuleContext;
+import net.floodlightcontroller.core.module.FloodlightModuleException;
+import net.floodlightcontroller.core.module.IFloodlightModule;
+import net.floodlightcontroller.core.module.IFloodlightService;
+import net.onrc.onos.core.topology.ITopologyService;
+
+/**
+ * The WebSocket module class.
+ */
+public class WebSocketModule implements IFloodlightModule, IWebSocketService {
+    private WebSocketManager webSocketManager;
+    private static final int DEFAULT_WEBSOCKET_PORT = 8081;
+
+    @Override
+    public Collection<Class<? extends IFloodlightService>> getModuleServices() {
+        List<Class<? extends IFloodlightService>> services =
+                new ArrayList<Class<? extends IFloodlightService>>();
+        services.add(IWebSocketService.class);
+        return services;
+    }
+
+    @Override
+    public Map<Class<? extends IFloodlightService>, IFloodlightService>
+    getServiceImpls() {
+        Map<Class<? extends IFloodlightService>, IFloodlightService> impls =
+                new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
+        impls.put(IWebSocketService.class, this);
+        return impls;
+    }
+
+    @Override
+    public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
+        List<Class<? extends IFloodlightService>> dependencies =
+                new ArrayList<Class<? extends IFloodlightService>>();
+        dependencies.add(ITopologyService.class);
+        return dependencies;
+    }
+
+    @Override
+    public void init(FloodlightModuleContext context)
+            throws FloodlightModuleException {
+        ITopologyService topologyService =
+            context.getServiceImpl(ITopologyService.class);
+
+        //
+        // Read the configuration options
+        //
+        int webSocketPort = DEFAULT_WEBSOCKET_PORT;
+        Map<String, String> configOptions = context.getConfigParams(this);
+        String port = configOptions.get("port");
+        if (port != null) {
+            webSocketPort = Integer.parseInt(port);
+        }
+
+        // Initialize the WebSocketManager
+        webSocketManager = new WebSocketManager(topologyService,
+                                                webSocketPort);
+    }
+
+    @Override
+    public void startUp(FloodlightModuleContext context) {
+        webSocketManager.startup();
+    }
+}
diff --git a/src/main/resources/META-INF/services/net.floodlightcontroller.core.module.IFloodlightModule b/src/main/resources/META-INF/services/net.floodlightcontroller.core.module.IFloodlightModule
index 28c4a83..5e6c88a 100644
--- a/src/main/resources/META-INF/services/net.floodlightcontroller.core.module.IFloodlightModule
+++ b/src/main/resources/META-INF/services/net.floodlightcontroller.core.module.IFloodlightModule
@@ -8,6 +8,7 @@
 net.onrc.onos.core.datagrid.HazelcastDatagrid
 net.onrc.onos.core.flowprogrammer.FlowProgrammer
 net.onrc.onos.apps.sdnip.SdnIp
+net.onrc.onos.apps.websocket.WebSocketModule
 net.onrc.onos.core.registry.ZookeeperRegistry
 net.onrc.onos.core.registry.StandaloneRegistry
 net.onrc.onos.apps.forwarding.Forwarding