blob: 9677265d367866d3dbb5cd4441b6d111b1fec108 [file] [log] [blame]
Shravan Ambati7d199542016-04-22 16:09:05 -07001/**
2 * Copyright 2016-present Open Networking Laboratory
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6
7 * http://www.apache.org/licenses/LICENSE-2.0
8
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15package org.onosproject.kafkaintegration.rest;
16
Jian Lic2a542b2016-05-10 11:48:19 -070017import com.fasterxml.jackson.databind.ObjectMapper;
18import com.fasterxml.jackson.databind.node.ObjectNode;
Shravan Ambati7d199542016-04-22 16:09:05 -070019import org.onosproject.codec.JsonCodec;
20import org.onosproject.kafkaintegration.api.EventExporterService;
21import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
22import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
23import org.onosproject.rest.AbstractWebResource;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
26
Jian Lic2a542b2016-05-10 11:48:19 -070027import javax.ws.rs.Consumes;
28import javax.ws.rs.DELETE;
29import javax.ws.rs.POST;
30import javax.ws.rs.Path;
31import javax.ws.rs.Produces;
32import javax.ws.rs.core.MediaType;
33import javax.ws.rs.core.Response;
34import java.io.IOException;
35import java.io.InputStream;
36
37import static com.google.common.base.Preconditions.checkNotNull;
38import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
Shravan Ambati7d199542016-04-22 16:09:05 -070039
40/**
41 * Rest Interfaces for subscribing/unsubscribing to event notifications.
42 */
43@Path("kafkaService")
44public class EventExporterWebResource extends AbstractWebResource {
45
46 private final Logger log = LoggerFactory.getLogger(getClass());
47 public static final String JSON_NOT_NULL = "Registration Data cannot be empty";
48 public static final String REGISTRATION_SUCCESSFUL = "Registered Listener successfully";
49 public static final String DEREGISTRATION_SUCCESSFUL = "De-Registered Listener successfully";
50 public static final String EVENT_SUBSCRIPTION_SUCCESSFUL = "Event Registration successfull";
51 public static final String EVENT_SUBSCRIPTION_REMOVED = "Event De-Registration successfull";
52
53 /**
54 * Registers a listener for Onos Events.
55 *
56 * @param appName The application trying to register
57 * @return 200 OK with UUID string which should be used as Kafka Consumer
58 * Group Id
59 * @onos.rsModel KafkaRegistration
60 */
61 @POST
62 @Produces(MediaType.APPLICATION_JSON)
63 @Consumes(MediaType.APPLICATION_JSON)
64 @Path("register")
65 public Response registerKafkaListener(String appName) {
66
67 EventExporterService service = get(EventExporterService.class);
68
69 EventSubscriberGroupId groupId = service.registerListener(appName);
70
71 log.info("Registered app {}", appName);
72
73 // TODO: Should also return Kafka server information.
74 // Will glue this in when we have the config and Kafka modules ready
75 return ok(groupId.getId().toString()).build();
76 }
77
78 /**
79 * Unregisters a listener for Onos Events.
80 *
81 * @param appName The application trying to unregister
82 * @return 200 OK
83 * @onos.rsModel KafkaRegistration
84 */
85 @DELETE
Shravan Ambati7d199542016-04-22 16:09:05 -070086 @Path("unregister")
87 public Response removeKafkaListener(String appName) {
88 EventExporterService service = get(EventExporterService.class);
89
90 service.unregisterListener(appName);
91
92 return ok(DEREGISTRATION_SUCCESSFUL).build();
93 }
94
95 /**
96 * Creates subscription to a specific Onos event.
97 *
98 * @param input Subscription Data in Json format
99 * @return 200 OK if successful or 400 BAD REQUEST
100 * @onos.rsModel KafkaSubscription
101 */
102 @POST
103 @Produces(MediaType.APPLICATION_JSON)
104 @Path("subscribe")
105 public Response subscribe(InputStream input) {
106
107 EventExporterService service = get(EventExporterService.class);
108
109 try {
110 EventSubscriber sub = parseSubscriptionData(input);
111 service.subscribe(sub);
112 } catch (Exception e) {
113 log.error(e.getMessage());
114 return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
115 }
116
117 return ok(EVENT_SUBSCRIPTION_SUCCESSFUL).build();
118 }
119
120 /**
121 * Parses Json Subscription Data from the external application.
122 *
123 * @param node node within the parsed json tree.
124 * @return parsed DTO object
125 * @throws IOException
126 */
127 private EventSubscriber parseSubscriptionData(InputStream input)
128 throws IOException {
129
130 ObjectMapper mapper = new ObjectMapper();
131 ObjectNode node = (ObjectNode) mapper.readTree(input);
132
133 checkNotNull(node, JSON_NOT_NULL);
134
135 JsonCodec<EventSubscriber> codec = codec(EventSubscriber.class);
136 return codec.decode(node, this);
137 }
138
139 /**
140 * Deletes subscription from a specific Onos event.
141 *
142 * @param input data in json format
143 * @return 200 OK if successful or 400 BAD REQUEST
144 * @onos.rsModel KafkaSubscription
145 */
146 @DELETE
Jian Lic2a542b2016-05-10 11:48:19 -0700147 @Consumes(MediaType.APPLICATION_JSON)
Shravan Ambati7d199542016-04-22 16:09:05 -0700148 @Path("unsubscribe")
149 public Response unsubscribe(InputStream input) {
150
151 EventExporterService service = get(EventExporterService.class);
152
153 try {
154 EventSubscriber sub = parseSubscriptionData(input);
155 service.subscribe(sub);
156 } catch (Exception e) {
157 log.error(e.getMessage());
158 return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
159 }
160
161 return ok(EVENT_SUBSCRIPTION_REMOVED).build();
162 }
163}