Adding ServerSentEvents to Rest Southbound interface
Change-Id: I77411df608be8a1cab9d828db17202f88b969a0f
diff --git a/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java b/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java
index 83737ca..a527e0e 100644
--- a/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java
+++ b/protocols/rest/ctl/src/main/java/org/onosproject/protocol/rest/ctl/RestSBControllerImpl.java
@@ -20,15 +20,23 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.event.ListenerService;
import org.onosproject.net.DeviceId;
import org.onosproject.protocol.http.ctl.HttpSBControllerImpl;
import org.onosproject.protocol.rest.RestSBController;
import org.onosproject.protocol.rest.RestSBDevice;
+import org.onosproject.protocol.rest.RestSBEventListener;
+import org.onosproject.protocol.rest.RestSBServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.sse.InboundSseEvent;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -39,15 +47,23 @@
*/
@Component(immediate = true)
@Service
-public class RestSBControllerImpl extends HttpSBControllerImpl implements RestSBController {
+public class RestSBControllerImpl extends HttpSBControllerImpl
+ implements RestSBController, ListenerService<RestSBServerSentEvent, RestSBEventListener> {
private static final Logger log =
LoggerFactory.getLogger(RestSBControllerImpl.class);
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDispatcher;
+
+ protected final ListenerRegistry<RestSBServerSentEvent, RestSBEventListener> listenerRegistry =
+ new ListenerRegistry<>();
+
private final Map<DeviceId, RestSBDevice> proxiedDeviceMap = new ConcurrentHashMap<>();
@Activate
public void activate() {
+ eventDispatcher.addSink(RestSBServerSentEvent.class, listenerRegistry);
log.info("Started");
}
@@ -55,6 +71,7 @@
public void deactivate() {
this.getClientMap().clear();
this.getDeviceMap().clear();
+ this.getSseEventSourceMap().clear();
log.info("Stopped");
}
@@ -118,4 +135,36 @@
}
}
+
+ @Override
+ public void startServerSentEvents(DeviceId deviceId, String eventsUrl) {
+ this.getServerSentEvents(deviceId, eventsUrl,
+ (event) -> sendEvent(event, deviceId),
+ (error) -> log.error("Unable to handle {} SSEvent from {}. {}",
+ eventsUrl, deviceId, error));
+ }
+
+ @Override
+ public void addListener(RestSBEventListener listener) {
+ listenerRegistry.addListener(listener);
+ }
+
+ @Override
+ public void removeListener(RestSBEventListener listener) {
+ listenerRegistry.removeListener(listener);
+ }
+
+ /**
+ * Safely posts the specified event to the local event dispatcher.
+ * If there is no event dispatcher or if the event is null, this method
+ * is a noop.
+ * @param sseEvent event to be posted; may be null
+ * @param deviceId the device that sent the event
+ */
+ protected void sendEvent(InboundSseEvent sseEvent, DeviceId deviceId) {
+ if (sseEvent != null && eventDispatcher != null) {
+ eventDispatcher.post(new RestSBServerSentEvent(
+ RestSBServerSentEvent.Type.SSE_INBOUND, deviceId, sseEvent));
+ }
+ }
}