Adding ServerSentEvents to Rest Southbound interface

Change-Id: I77411df608be8a1cab9d828db17202f88b969a0f
diff --git a/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java b/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java
index 83737ca..a527e0e 100644
--- a/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java
+++ b/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java
@@ -20,15 +20,23 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.event.ListenerService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.protocol.http.ctl.HttpSBControllerImpl;
 import org.onosproject.protocol.rest.RestSBController;
 import org.onosproject.protocol.rest.RestSBDevice;
+import org.onosproject.protocol.rest.RestSBEventListener;
+import org.onosproject.protocol.rest.RestSBServerSentEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.sse.InboundSseEvent;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -39,15 +47,23 @@
  */
 @Component(immediate = true)
 @Service
-public class RestSBControllerImpl extends HttpSBControllerImpl implements RestSBController {
+public class RestSBControllerImpl extends HttpSBControllerImpl
+        implements RestSBController, ListenerService<RestSBServerSentEvent, RestSBEventListener> {
 
     private static final Logger log =
             LoggerFactory.getLogger(RestSBControllerImpl.class);
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
+    protected final ListenerRegistry<RestSBServerSentEvent, RestSBEventListener> listenerRegistry =
+            new ListenerRegistry<>();
+
     private final Map<DeviceId, RestSBDevice> proxiedDeviceMap = new ConcurrentHashMap<>();
 
     @Activate
     public void activate() {
+        eventDispatcher.addSink(RestSBServerSentEvent.class, listenerRegistry);
         log.info("Started");
     }
 
@@ -55,6 +71,7 @@
     public void deactivate() {
         this.getClientMap().clear();
         this.getDeviceMap().clear();
+        this.getSseEventSourceMap().clear();
         log.info("Stopped");
     }
 
@@ -118,4 +135,36 @@
         }
 
     }
+
+    @Override
+    public void startServerSentEvents(DeviceId deviceId, String eventsUrl) {
+        this.getServerSentEvents(deviceId, eventsUrl,
+                (event) -> sendEvent(event, deviceId),
+                (error) -> log.error("Unable to handle {} SSEvent from {}. {}",
+                        eventsUrl, deviceId, error));
+    }
+
+    @Override
+    public void addListener(RestSBEventListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(RestSBEventListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
+
+    /**
+     * Safely posts the specified event to the local event dispatcher.
+     * If there is no event dispatcher or if the event is null, this method
+     * is a noop.
+     * @param sseEvent event to be posted; may be null
+     * @param deviceId the device that sent the event
+     */
+    protected void sendEvent(InboundSseEvent sseEvent, DeviceId deviceId) {
+        if (sseEvent != null && eventDispatcher != null) {
+            eventDispatcher.post(new RestSBServerSentEvent(
+                    RestSBServerSentEvent.Type.SSE_INBOUND, deviceId, sseEvent));
+        }
+    }
 }