blob: e3b7cc684dc3c8851e6995c504ce273b06bd8dff [file] [log] [blame]
Shravan Ambati7d199542016-04-22 16:09:05 -07001/**
2 * Copyright 2016-present Open Networking Laboratory
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6
7 * http://www.apache.org/licenses/LICENSE-2.0
8
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15package org.onosproject.kafkaintegration.rest;
16
17import static com.google.common.base.Preconditions.checkNotNull;
18import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
19
20import java.io.IOException;
21import java.io.InputStream;
22
23import javax.ws.rs.Consumes;
24import javax.ws.rs.DELETE;
25import javax.ws.rs.POST;
26import javax.ws.rs.Path;
27import javax.ws.rs.Produces;
28import javax.ws.rs.core.MediaType;
29import javax.ws.rs.core.Response;
30
31import org.onosproject.codec.JsonCodec;
32import org.onosproject.kafkaintegration.api.EventExporterService;
33import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
34import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
35import org.onosproject.rest.AbstractWebResource;
36import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
39import com.fasterxml.jackson.databind.ObjectMapper;
40import com.fasterxml.jackson.databind.node.ObjectNode;
41
42/**
43 * Rest Interfaces for subscribing/unsubscribing to event notifications.
44 */
45@Path("kafkaService")
46public class EventExporterWebResource extends AbstractWebResource {
47
48 private final Logger log = LoggerFactory.getLogger(getClass());
49 public static final String JSON_NOT_NULL = "Registration Data cannot be empty";
50 public static final String REGISTRATION_SUCCESSFUL = "Registered Listener successfully";
51 public static final String DEREGISTRATION_SUCCESSFUL = "De-Registered Listener successfully";
52 public static final String EVENT_SUBSCRIPTION_SUCCESSFUL = "Event Registration successfull";
53 public static final String EVENT_SUBSCRIPTION_REMOVED = "Event De-Registration successfull";
54
55 /**
56 * Registers a listener for Onos Events.
57 *
58 * @param appName The application trying to register
59 * @return 200 OK with UUID string which should be used as Kafka Consumer
60 * Group Id
61 * @onos.rsModel KafkaRegistration
62 */
63 @POST
64 @Produces(MediaType.APPLICATION_JSON)
65 @Consumes(MediaType.APPLICATION_JSON)
66 @Path("register")
67 public Response registerKafkaListener(String appName) {
68
69 EventExporterService service = get(EventExporterService.class);
70
71 EventSubscriberGroupId groupId = service.registerListener(appName);
72
73 log.info("Registered app {}", appName);
74
75 // TODO: Should also return Kafka server information.
76 // Will glue this in when we have the config and Kafka modules ready
77 return ok(groupId.getId().toString()).build();
78 }
79
80 /**
81 * Unregisters a listener for Onos Events.
82 *
83 * @param appName The application trying to unregister
84 * @return 200 OK
85 * @onos.rsModel KafkaRegistration
86 */
87 @DELETE
88 @Produces(MediaType.APPLICATION_JSON)
89 @Consumes(MediaType.APPLICATION_JSON)
90 @Path("unregister")
91 public Response removeKafkaListener(String appName) {
92 EventExporterService service = get(EventExporterService.class);
93
94 service.unregisterListener(appName);
95
96 return ok(DEREGISTRATION_SUCCESSFUL).build();
97 }
98
99 /**
100 * Creates subscription to a specific Onos event.
101 *
102 * @param input Subscription Data in Json format
103 * @return 200 OK if successful or 400 BAD REQUEST
104 * @onos.rsModel KafkaSubscription
105 */
106 @POST
107 @Produces(MediaType.APPLICATION_JSON)
108 @Path("subscribe")
109 public Response subscribe(InputStream input) {
110
111 EventExporterService service = get(EventExporterService.class);
112
113 try {
114 EventSubscriber sub = parseSubscriptionData(input);
115 service.subscribe(sub);
116 } catch (Exception e) {
117 log.error(e.getMessage());
118 return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
119 }
120
121 return ok(EVENT_SUBSCRIPTION_SUCCESSFUL).build();
122 }
123
124 /**
125 * Parses Json Subscription Data from the external application.
126 *
127 * @param node node within the parsed json tree.
128 * @return parsed DTO object
129 * @throws IOException
130 */
131 private EventSubscriber parseSubscriptionData(InputStream input)
132 throws IOException {
133
134 ObjectMapper mapper = new ObjectMapper();
135 ObjectNode node = (ObjectNode) mapper.readTree(input);
136
137 checkNotNull(node, JSON_NOT_NULL);
138
139 JsonCodec<EventSubscriber> codec = codec(EventSubscriber.class);
140 return codec.decode(node, this);
141 }
142
143 /**
144 * Deletes subscription from a specific Onos event.
145 *
146 * @param input data in json format
147 * @return 200 OK if successful or 400 BAD REQUEST
148 * @onos.rsModel KafkaSubscription
149 */
150 @DELETE
151 @Produces(MediaType.APPLICATION_JSON)
152 @Path("unsubscribe")
153 public Response unsubscribe(InputStream input) {
154
155 EventExporterService service = get(EventExporterService.class);
156
157 try {
158 EventSubscriber sub = parseSubscriptionData(input);
159 service.subscribe(sub);
160 } catch (Exception e) {
161 log.error(e.getMessage());
162 return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
163 }
164
165 return ok(EVENT_SUBSCRIPTION_REMOVED).build();
166 }
167}