blob: 3671abbbf1ac8453fd415a0a3737b1ad949d5ac1 [file] [log] [blame]
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07001/**
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Shravan Ambatibb6b4452016-05-04 13:25:28 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.impl;
17
Ray Milkeyd84f89b2018-08-17 14:54:17 -070018import com.google.common.collect.ImmutableList;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070019import org.onosproject.core.ApplicationId;
20import org.onosproject.core.CoreService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070021import org.onosproject.kafkaintegration.api.EventSubscriptionService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070022import org.onosproject.kafkaintegration.api.KafkaConfigService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070023import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
24import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070025import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -070026import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070027import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070028import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070029import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
30import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070031import org.onosproject.store.serializers.KryoNamespaces;
32import org.onosproject.store.service.Serializer;
33import org.onosproject.store.service.StorageService;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070034import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070039import org.slf4j.Logger;
40import org.slf4j.LoggerFactory;
41
Ray Milkeyd84f89b2018-08-17 14:54:17 -070042import java.util.ArrayList;
43import java.util.List;
44import java.util.Map;
45import java.util.UUID;
46
47import static com.google.common.base.Preconditions.checkNotNull;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070048
Shravan Ambatibb6b4452016-05-04 13:25:28 -070049/**
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070050 * Implementation of Event Subscription Manager.
Shravan Ambatibb6b4452016-05-04 13:25:28 -070051 *
52 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070053@Component(immediate = true, service = EventSubscriptionService.class)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070054public class EventSubscriptionManager implements EventSubscriptionService {
Shravan Ambatibb6b4452016-05-04 13:25:28 -070055
56 private final Logger log = LoggerFactory.getLogger(getClass());
57
58 // Stores the currently registered applications for event export service.
59 private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
60
61 private Map<Type, List<EventSubscriber>> subscriptions;
62
63 private static final String REGISTERED_APPS = "registered-applications";
64
65 private static final String SUBSCRIBED_APPS = "event-subscriptions";
66
Ray Milkeyd84f89b2018-08-17 14:54:17 -070067 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatibb6b4452016-05-04 13:25:28 -070068 protected CoreService coreService;
69
Ray Milkeyd84f89b2018-08-17 14:54:17 -070070 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatibb6b4452016-05-04 13:25:28 -070071 protected StorageService storageService;
72
Ray Milkeyd84f89b2018-08-17 14:54:17 -070073 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambati5a11e172016-07-21 15:55:28 -070074 protected KafkaConfigService kafkaConfigService;
75
Shravan Ambatibb6b4452016-05-04 13:25:28 -070076 private ApplicationId appId;
77
78 @Activate
79 protected void activate() {
80 appId = coreService
81 .registerApplication("org.onosproject.kafkaintegration");
82
83 registeredApps = storageService
84 .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
85 .withName(REGISTERED_APPS)
86 .withSerializer(Serializer.using(KryoNamespaces.API,
87 EventSubscriberGroupId.class,
88 UUID.class))
89 .build().asJavaMap();
90
91 subscriptions = storageService
92 .<Type, List<EventSubscriber>>consistentMapBuilder()
93 .withName(SUBSCRIBED_APPS)
Shravan Ambati5a11e172016-07-21 15:55:28 -070094 .withSerializer(Serializer
95 .using(KryoNamespaces.API, EventSubscriber.class,
96 OnosEvent.class, OnosEvent.Type.class,
97 DefaultEventSubscriber.class,
98 EventSubscriberGroupId.class, UUID.class))
Shravan Ambatibb6b4452016-05-04 13:25:28 -070099 .build().asJavaMap();
100
101 log.info("Started");
102 }
103
104 @Deactivate
105 protected void deactivate() {
106 log.info("Stopped");
107 }
108
109 @Override
Shravan Ambati5a11e172016-07-21 15:55:28 -0700110 public RegistrationResponse registerListener(String appName) {
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700111
112 // TODO: Remove it once ONOS provides a mechanism for external apps
113 // to register with the core service. See Jira - 4409
114 ApplicationId externalAppId = coreService.registerApplication(appName);
115
Shravan Ambati5a11e172016-07-21 15:55:28 -0700116 EventSubscriberGroupId id =
117 registeredApps.computeIfAbsent(externalAppId,
118 (key) -> new EventSubscriberGroupId(UUID
119 .randomUUID()));
120
121 RegistrationResponse response = new RegistrationResponse(id,
122 kafkaConfigService.getConfigParams().getIpAddress(),
123 kafkaConfigService.getConfigParams().getPort());
124
125 return response;
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700126 }
127
128 @Override
129 public void unregisterListener(String appName) {
130 ApplicationId externalAppId =
131 checkNotNull(coreService.getAppId(appName));
132 registeredApps.remove(externalAppId);
133 }
134
135 @Override
136 public void subscribe(EventSubscriber subscriber)
137 throws InvalidGroupIdException, InvalidApplicationException {
138
139 checkNotNull(subscriber);
140
141 if (!registeredApplication(subscriber.appName())) {
142 throw new InvalidApplicationException("Application is not "
143 + "registered to make this request.");
144 }
145
Shravan Ambati5a11e172016-07-21 15:55:28 -0700146 if (!validGroupId(subscriber.subscriberGroupId(),
147 subscriber.appName())) {
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700148 throw new InvalidGroupIdException("Incorrect group id in the request");
149 }
150
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700151 // update internal state
152 List<EventSubscriber> subscriptionList =
153 subscriptions.get(subscriber.eventType());
154 if (subscriptionList == null) {
155 subscriptionList = new ArrayList<EventSubscriber>();
156 }
157 subscriptionList.add(subscriber);
158 subscriptions.put(subscriber.eventType(), subscriptionList);
159
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700160 log.info("Subscription for {} event by {} successful",
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700161 subscriber.eventType(), subscriber.appName());
162 }
163
164 /**
165 * Checks if the application has registered.
166 *
167 * @param appName application name
168 * @return true if application has registered
169 */
170 private boolean registeredApplication(String appName) {
171
172 checkNotNull(appName);
173 ApplicationId appId = checkNotNull(coreService.getAppId(appName));
174 if (registeredApps.containsKey(appId)) {
175 return true;
176 }
177
178 log.debug("{} is not registered", appName);
179 return false;
180 }
181
182 /**
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700183 * Checks if the group id is valid for this registered application.
184 *
185 * @param groupId GroupId assigned to the subscriber
186 * @param appName Registered Application name
187 * @return true if valid groupId and false otherwise
188 */
189 private boolean validGroupId(EventSubscriberGroupId groupId,
190 String appName) {
191
192 checkNotNull(groupId);
193
194 ApplicationId appId = coreService.getAppId(appName);
195 EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
196 if (registeredGroupId.equals(groupId)) {
197 return true;
198 }
199
200 return false;
201 }
202
203 @Override
204 public void unsubscribe(EventSubscriber subscriber)
205 throws InvalidGroupIdException, InvalidApplicationException {
206
207 checkNotNull(subscriber);
208
209 if (!registeredApplication(subscriber.appName())) {
210 throw new InvalidApplicationException("Application is not "
211 + "registered to make this request.");
212 }
213
214 if (!validGroupId(subscriber.subscriberGroupId(),
215 subscriber.appName())) {
216 throw new InvalidGroupIdException("Incorrect group id in the request");
217 }
218
219 if (!eventSubscribed(subscriber)) {
220 log.error("No subscription to {} was found",
221 subscriber.eventType());
222 return;
223 }
224
Shravan Ambatia4875d82017-01-09 13:06:51 -0800225 // update internal state.
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700226 List<EventSubscriber> subscribers =
227 subscriptions.get(subscriber.eventType());
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700228
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700229 subscribers.remove(subscriber);
230 subscriptions.put(subscriber.eventType(), subscribers);
231
232 log.info("Unsubscribed {} for {} events", subscriber.appName(),
233 subscriber.eventType());
234 }
235
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700236 @Override
237 public List<EventSubscriber> getEventSubscribers(Type type) {
238 return subscriptions.getOrDefault(type, ImmutableList.of());
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700239 }
240
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700241 /**
242 * Checks if the subscriber has already subscribed to the requested event
243 * type.
244 *
245 * @param subscriber the subscriber to a specific ONOS event
246 * @return true if subscriber has subscribed to the ONOS event
247 */
248 private boolean eventSubscribed(EventSubscriber subscriber) {
249
250 List<EventSubscriber> subscriberList =
251 subscriptions.get(subscriber.eventType());
252
253 if (subscriberList == null) {
254 return false;
255 }
256
257 return subscriberList.contains(subscriber);
258 }
259
260}