blob: fdb7ed5b5c4e34ee7c5af8a72287afde34827e7c [file] [log] [blame]
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07001/**
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Shravan Ambatibb6b4452016-05-04 13:25:28 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.impl;
17
Shravan Ambati5a11e172016-07-21 15:55:28 -070018import static com.google.common.base.Preconditions.checkNotNull;
Shravan Ambati5a11e172016-07-21 15:55:28 -070019
20import java.util.ArrayList;
21import java.util.List;
22import java.util.Map;
23import java.util.UUID;
24
Shravan Ambatibb6b4452016-05-04 13:25:28 -070025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070033import org.onosproject.kafkaintegration.api.EventSubscriptionService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070034import org.onosproject.kafkaintegration.api.KafkaConfigService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070035import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
36import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070037import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
Shravan Ambati5a11e172016-07-21 15:55:28 -070038import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -070039import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070040import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
41import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
42import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070043import org.onosproject.store.serializers.KryoNamespaces;
44import org.onosproject.store.service.Serializer;
45import org.onosproject.store.service.StorageService;
46import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
48
Shravan Ambati5a11e172016-07-21 15:55:28 -070049import com.google.common.collect.ImmutableList;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070050
Shravan Ambatibb6b4452016-05-04 13:25:28 -070051/**
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070052 * Implementation of Event Subscription Manager.
Shravan Ambatibb6b4452016-05-04 13:25:28 -070053 *
54 */
55@Component(immediate = true)
56@Service
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070057public class EventSubscriptionManager implements EventSubscriptionService {
Shravan Ambatibb6b4452016-05-04 13:25:28 -070058
59 private final Logger log = LoggerFactory.getLogger(getClass());
60
61 // Stores the currently registered applications for event export service.
62 private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
63
64 private Map<Type, List<EventSubscriber>> subscriptions;
65
66 private static final String REGISTERED_APPS = "registered-applications";
67
68 private static final String SUBSCRIBED_APPS = "event-subscriptions";
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Shravan Ambatibb6b4452016-05-04 13:25:28 -070071 protected CoreService coreService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected StorageService storageService;
75
Shravan Ambati5a11e172016-07-21 15:55:28 -070076 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected KafkaConfigService kafkaConfigService;
78
Shravan Ambatibb6b4452016-05-04 13:25:28 -070079 private ApplicationId appId;
80
81 @Activate
82 protected void activate() {
83 appId = coreService
84 .registerApplication("org.onosproject.kafkaintegration");
85
86 registeredApps = storageService
87 .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
88 .withName(REGISTERED_APPS)
89 .withSerializer(Serializer.using(KryoNamespaces.API,
90 EventSubscriberGroupId.class,
91 UUID.class))
92 .build().asJavaMap();
93
94 subscriptions = storageService
95 .<Type, List<EventSubscriber>>consistentMapBuilder()
96 .withName(SUBSCRIBED_APPS)
Shravan Ambati5a11e172016-07-21 15:55:28 -070097 .withSerializer(Serializer
98 .using(KryoNamespaces.API, EventSubscriber.class,
99 OnosEvent.class, OnosEvent.Type.class,
100 DefaultEventSubscriber.class,
101 EventSubscriberGroupId.class, UUID.class))
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700102 .build().asJavaMap();
103
104 log.info("Started");
105 }
106
107 @Deactivate
108 protected void deactivate() {
109 log.info("Stopped");
110 }
111
112 @Override
Shravan Ambati5a11e172016-07-21 15:55:28 -0700113 public RegistrationResponse registerListener(String appName) {
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700114
115 // TODO: Remove it once ONOS provides a mechanism for external apps
116 // to register with the core service. See Jira - 4409
117 ApplicationId externalAppId = coreService.registerApplication(appName);
118
Shravan Ambati5a11e172016-07-21 15:55:28 -0700119 EventSubscriberGroupId id =
120 registeredApps.computeIfAbsent(externalAppId,
121 (key) -> new EventSubscriberGroupId(UUID
122 .randomUUID()));
123
124 RegistrationResponse response = new RegistrationResponse(id,
125 kafkaConfigService.getConfigParams().getIpAddress(),
126 kafkaConfigService.getConfigParams().getPort());
127
128 return response;
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700129 }
130
131 @Override
132 public void unregisterListener(String appName) {
133 ApplicationId externalAppId =
134 checkNotNull(coreService.getAppId(appName));
135 registeredApps.remove(externalAppId);
136 }
137
138 @Override
139 public void subscribe(EventSubscriber subscriber)
140 throws InvalidGroupIdException, InvalidApplicationException {
141
142 checkNotNull(subscriber);
143
144 if (!registeredApplication(subscriber.appName())) {
145 throw new InvalidApplicationException("Application is not "
146 + "registered to make this request.");
147 }
148
Shravan Ambati5a11e172016-07-21 15:55:28 -0700149 if (!validGroupId(subscriber.subscriberGroupId(),
150 subscriber.appName())) {
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700151 throw new InvalidGroupIdException("Incorrect group id in the request");
152 }
153
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700154 // update internal state
155 List<EventSubscriber> subscriptionList =
156 subscriptions.get(subscriber.eventType());
157 if (subscriptionList == null) {
158 subscriptionList = new ArrayList<EventSubscriber>();
159 }
160 subscriptionList.add(subscriber);
161 subscriptions.put(subscriber.eventType(), subscriptionList);
162
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700163 log.info("Subscription for {} event by {} successful",
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700164 subscriber.eventType(), subscriber.appName());
165 }
166
167 /**
168 * Checks if the application has registered.
169 *
170 * @param appName application name
171 * @return true if application has registered
172 */
173 private boolean registeredApplication(String appName) {
174
175 checkNotNull(appName);
176 ApplicationId appId = checkNotNull(coreService.getAppId(appName));
177 if (registeredApps.containsKey(appId)) {
178 return true;
179 }
180
181 log.debug("{} is not registered", appName);
182 return false;
183 }
184
185 /**
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700186 * Checks if the group id is valid for this registered application.
187 *
188 * @param groupId GroupId assigned to the subscriber
189 * @param appName Registered Application name
190 * @return true if valid groupId and false otherwise
191 */
192 private boolean validGroupId(EventSubscriberGroupId groupId,
193 String appName) {
194
195 checkNotNull(groupId);
196
197 ApplicationId appId = coreService.getAppId(appName);
198 EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
199 if (registeredGroupId.equals(groupId)) {
200 return true;
201 }
202
203 return false;
204 }
205
206 @Override
207 public void unsubscribe(EventSubscriber subscriber)
208 throws InvalidGroupIdException, InvalidApplicationException {
209
210 checkNotNull(subscriber);
211
212 if (!registeredApplication(subscriber.appName())) {
213 throw new InvalidApplicationException("Application is not "
214 + "registered to make this request.");
215 }
216
217 if (!validGroupId(subscriber.subscriberGroupId(),
218 subscriber.appName())) {
219 throw new InvalidGroupIdException("Incorrect group id in the request");
220 }
221
222 if (!eventSubscribed(subscriber)) {
223 log.error("No subscription to {} was found",
224 subscriber.eventType());
225 return;
226 }
227
Shravan Ambatia4875d82017-01-09 13:06:51 -0800228 // update internal state.
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700229 List<EventSubscriber> subscribers =
230 subscriptions.get(subscriber.eventType());
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700231
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700232 subscribers.remove(subscriber);
233 subscriptions.put(subscriber.eventType(), subscribers);
234
235 log.info("Unsubscribed {} for {} events", subscriber.appName(),
236 subscriber.eventType());
237 }
238
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700239 @Override
240 public List<EventSubscriber> getEventSubscribers(Type type) {
241 return subscriptions.getOrDefault(type, ImmutableList.of());
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700242 }
243
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700244 /**
245 * Checks if the subscriber has already subscribed to the requested event
246 * type.
247 *
248 * @param subscriber the subscriber to a specific ONOS event
249 * @return true if subscriber has subscribed to the ONOS event
250 */
251 private boolean eventSubscribed(EventSubscriber subscriber) {
252
253 List<EventSubscriber> subscriberList =
254 subscriptions.get(subscriber.eventType());
255
256 if (subscriberList == null) {
257 return false;
258 }
259
260 return subscriberList.contains(subscriber);
261 }
262
263}