blob: 8f2c025b14ec983c502b511205304373b54f100e [file] [log] [blame]
Ray Milkey2d572dd2017-04-14 10:01:24 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Ray Milkey2d572dd2017-04-14 10:01:24 -07003 *
Shravan Ambati7d199542016-04-22 16:09:05 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Ray Milkey2d572dd2017-04-14 10:01:24 -07007 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
Shravan Ambati7d199542016-04-22 16:09:05 -070010 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.rest;
17
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070018import com.fasterxml.jackson.databind.ObjectMapper;
19import com.fasterxml.jackson.databind.node.ObjectNode;
20import org.onosproject.kafkaintegration.api.EventSubscriptionService;
21import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
Shravan Ambati5a11e172016-07-21 15:55:28 -070022import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070023import org.onosproject.rest.AbstractWebResource;
24import org.slf4j.Logger;
25import org.slf4j.LoggerFactory;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070026
Jian Lic2a542b2016-05-10 11:48:19 -070027import javax.ws.rs.Consumes;
28import javax.ws.rs.DELETE;
29import javax.ws.rs.POST;
30import javax.ws.rs.Path;
31import javax.ws.rs.Produces;
32import javax.ws.rs.core.MediaType;
33import javax.ws.rs.core.Response;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070034import java.io.IOException;
35import java.io.InputStream;
Jian Lic2a542b2016-05-10 11:48:19 -070036
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070037import static com.google.common.base.Preconditions.checkNotNull;
38import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
Ray Milkeyb784adb2018-04-02 15:33:07 -070039import static org.onlab.util.Tools.readTreeFromStream;
Shravan Ambati7d199542016-04-22 16:09:05 -070040
41/**
42 * Rest Interfaces for subscribing/unsubscribing to event notifications.
43 */
44@Path("kafkaService")
45public class EventExporterWebResource extends AbstractWebResource {
46
47 private final Logger log = LoggerFactory.getLogger(getClass());
Shravan Ambatibb6b4452016-05-04 13:25:28 -070048 public static final String JSON_NOT_NULL =
49 "Registration Data cannot be empty";
50 public static final String REGISTRATION_SUCCESSFUL =
51 "Registered Listener successfully";
52 public static final String DEREGISTRATION_SUCCESSFUL =
53 "De-Registered Listener successfully";
54 public static final String EVENT_SUBSCRIPTION_SUCCESSFUL =
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070055 "Event Registration successful";
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -070056 public static final String EVENT_SUBSCRIPTION_UNSUCCESSFUL =
57 "Event subscription unsuccessful";
Shravan Ambatibb6b4452016-05-04 13:25:28 -070058 public static final String EVENT_SUBSCRIPTION_REMOVED =
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070059 "Event De-Registration successful";
60
Shravan Ambati7d199542016-04-22 16:09:05 -070061 /**
Shravan Ambatibb6b4452016-05-04 13:25:28 -070062 * Registers a listener for ONOS Events.
Shravan Ambati7d199542016-04-22 16:09:05 -070063 *
64 * @param appName The application trying to register
65 * @return 200 OK with UUID string which should be used as Kafka Consumer
Shravan Ambati5a11e172016-07-21 15:55:28 -070066 * Group Id and Kafka Server, port information.
Shravan Ambati7d199542016-04-22 16:09:05 -070067 * @onos.rsModel KafkaRegistration
68 */
69 @POST
70 @Produces(MediaType.APPLICATION_JSON)
71 @Consumes(MediaType.APPLICATION_JSON)
72 @Path("register")
73 public Response registerKafkaListener(String appName) {
74
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070075 EventSubscriptionService service = get(EventSubscriptionService.class);
Shravan Ambati7d199542016-04-22 16:09:05 -070076
Shravan Ambati5a11e172016-07-21 15:55:28 -070077 RegistrationResponse response = service.registerListener(appName);
78
79 ObjectNode result = mapper().createObjectNode();
80 result.put("groupId", response.getGroupId().getId().toString());
81 result.put("ipAddress", response.getIpAddress());
82 result.put("port", response.getPort());
Shravan Ambati7d199542016-04-22 16:09:05 -070083
84 log.info("Registered app {}", appName);
Shravan Ambati5a11e172016-07-21 15:55:28 -070085
86 return ok(result.toString()).build();
Shravan Ambati7d199542016-04-22 16:09:05 -070087 }
88
89 /**
Shravan Ambatibb6b4452016-05-04 13:25:28 -070090 * Unregisters a listener for ONOS Events.
Shravan Ambati7d199542016-04-22 16:09:05 -070091 *
92 * @param appName The application trying to unregister
93 * @return 200 OK
94 * @onos.rsModel KafkaRegistration
95 */
96 @DELETE
Shravan Ambati7d199542016-04-22 16:09:05 -070097 @Path("unregister")
98 public Response removeKafkaListener(String appName) {
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070099 EventSubscriptionService service = get(EventSubscriptionService.class);
Shravan Ambati7d199542016-04-22 16:09:05 -0700100
101 service.unregisterListener(appName);
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -0700102 log.info("Unregistered app {}", appName);
Shravan Ambati7d199542016-04-22 16:09:05 -0700103 return ok(DEREGISTRATION_SUCCESSFUL).build();
104 }
105
106 /**
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700107 * Creates subscription to a specific ONOS event.
Shravan Ambati7d199542016-04-22 16:09:05 -0700108 *
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700109 * @param input Subscription Data in JSON format
Shravan Ambati7d199542016-04-22 16:09:05 -0700110 * @return 200 OK if successful or 400 BAD REQUEST
111 * @onos.rsModel KafkaSubscription
112 */
113 @POST
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -0700114 @Consumes(MediaType.APPLICATION_JSON)
Shravan Ambati7d199542016-04-22 16:09:05 -0700115 @Produces(MediaType.APPLICATION_JSON)
116 @Path("subscribe")
117 public Response subscribe(InputStream input) {
118
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700119 EventSubscriptionService service = get(EventSubscriptionService.class);
Shravan Ambati7d199542016-04-22 16:09:05 -0700120
121 try {
122 EventSubscriber sub = parseSubscriptionData(input);
123 service.subscribe(sub);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700124 // It will subscribe to all the topics. Not only the one that is sent by the consumer.
Shravan Ambati7d199542016-04-22 16:09:05 -0700125 } catch (Exception e) {
126 log.error(e.getMessage());
127 return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
128 }
129
130 return ok(EVENT_SUBSCRIPTION_SUCCESSFUL).build();
131 }
132
133 /**
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700134 * Parses JSON Subscription Data from the external application.
Shravan Ambati7d199542016-04-22 16:09:05 -0700135 *
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700136 * @param input Subscription Data in JSON format
Shravan Ambati7d199542016-04-22 16:09:05 -0700137 * @return parsed DTO object
138 * @throws IOException
139 */
140 private EventSubscriber parseSubscriptionData(InputStream input)
141 throws IOException {
142
143 ObjectMapper mapper = new ObjectMapper();
Ray Milkeyb784adb2018-04-02 15:33:07 -0700144 ObjectNode node = readTreeFromStream(mapper, input);
Shravan Ambati7d199542016-04-22 16:09:05 -0700145 checkNotNull(node, JSON_NOT_NULL);
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -0700146 EventSubscriber codec = codec(EventSubscriber.class).decode(node, this);
147 checkNotNull(codec, JSON_NOT_NULL);
148 return codec;
Shravan Ambati7d199542016-04-22 16:09:05 -0700149 }
150
151 /**
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700152 * Deletes subscription from a specific ONOS event.
Shravan Ambati7d199542016-04-22 16:09:05 -0700153 *
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700154 * @param input data in JSON format
Shravan Ambati7d199542016-04-22 16:09:05 -0700155 * @return 200 OK if successful or 400 BAD REQUEST
156 * @onos.rsModel KafkaSubscription
157 */
158 @DELETE
Jian Lic2a542b2016-05-10 11:48:19 -0700159 @Consumes(MediaType.APPLICATION_JSON)
Shravan Ambati7d199542016-04-22 16:09:05 -0700160 @Path("unsubscribe")
161 public Response unsubscribe(InputStream input) {
162
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700163 EventSubscriptionService service = get(EventSubscriptionService.class);
Shravan Ambati7d199542016-04-22 16:09:05 -0700164
165 try {
166 EventSubscriber sub = parseSubscriptionData(input);
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700167 service.unsubscribe(sub);
Shravan Ambati7d199542016-04-22 16:09:05 -0700168 } catch (Exception e) {
169 log.error(e.getMessage());
170 return Response.status(BAD_REQUEST).entity(e.getMessage()).build();
171 }
172
173 return ok(EVENT_SUBSCRIPTION_REMOVED).build();
174 }
175}