blob: f530a3c92ab8cc4217098e76c92a90f93f37d583 [file] [log] [blame]
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07001/**
Shravan Ambatibb6b4452016-05-04 13:25:28 -07002 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.impl;
17
Shravan Ambati5a11e172016-07-21 15:55:28 -070018import static com.google.common.base.Preconditions.checkNotNull;
19import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
20import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
21
22import java.util.ArrayList;
23import java.util.List;
24import java.util.Map;
25import java.util.UUID;
26
Shravan Ambatibb6b4452016-05-04 13:25:28 -070027import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
33import org.onosproject.core.ApplicationId;
34import org.onosproject.core.CoreService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070035import org.onosproject.kafkaintegration.api.EventSubscriptionService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070036import org.onosproject.kafkaintegration.api.KafkaConfigService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070037import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
38import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070039import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
Shravan Ambati5a11e172016-07-21 15:55:28 -070040import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -070041import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070042import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
43import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
44import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070045import org.onosproject.kafkaintegration.listener.OnosEventListener;
46import org.onosproject.net.device.DeviceService;
47import org.onosproject.net.link.LinkService;
48import org.onosproject.store.serializers.KryoNamespaces;
49import org.onosproject.store.service.Serializer;
50import org.onosproject.store.service.StorageService;
51import org.slf4j.Logger;
52import org.slf4j.LoggerFactory;
53
Shravan Ambati5a11e172016-07-21 15:55:28 -070054import com.google.common.collect.ImmutableList;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070055
Shravan Ambatibb6b4452016-05-04 13:25:28 -070056/**
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070057 * Implementation of Event Subscription Manager.
Shravan Ambatibb6b4452016-05-04 13:25:28 -070058 *
59 */
60@Component(immediate = true)
61@Service
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070062public class EventSubscriptionManager implements EventSubscriptionService {
Shravan Ambatibb6b4452016-05-04 13:25:28 -070063
64 private final Logger log = LoggerFactory.getLogger(getClass());
65
66 // Stores the currently registered applications for event export service.
67 private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
68
69 private Map<Type, List<EventSubscriber>> subscriptions;
70
71 private static final String REGISTERED_APPS = "registered-applications";
72
73 private static final String SUBSCRIBED_APPS = "event-subscriptions";
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected DeviceService deviceService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected LinkService linkService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected CoreService coreService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected StorageService storageService;
86
Shravan Ambati5a11e172016-07-21 15:55:28 -070087 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected KafkaConfigService kafkaConfigService;
89
Shravan Ambatibb6b4452016-05-04 13:25:28 -070090 private ApplicationId appId;
91
92 @Activate
93 protected void activate() {
94 appId = coreService
95 .registerApplication("org.onosproject.kafkaintegration");
96
97 registeredApps = storageService
98 .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
99 .withName(REGISTERED_APPS)
100 .withSerializer(Serializer.using(KryoNamespaces.API,
101 EventSubscriberGroupId.class,
102 UUID.class))
103 .build().asJavaMap();
104
105 subscriptions = storageService
106 .<Type, List<EventSubscriber>>consistentMapBuilder()
107 .withName(SUBSCRIBED_APPS)
Shravan Ambati5a11e172016-07-21 15:55:28 -0700108 .withSerializer(Serializer
109 .using(KryoNamespaces.API, EventSubscriber.class,
110 OnosEvent.class, OnosEvent.Type.class,
111 DefaultEventSubscriber.class,
112 EventSubscriberGroupId.class, UUID.class))
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700113 .build().asJavaMap();
114
115 log.info("Started");
116 }
117
118 @Deactivate
119 protected void deactivate() {
120 log.info("Stopped");
121 }
122
123 @Override
Shravan Ambati5a11e172016-07-21 15:55:28 -0700124 public RegistrationResponse registerListener(String appName) {
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700125
126 // TODO: Remove it once ONOS provides a mechanism for external apps
127 // to register with the core service. See Jira - 4409
128 ApplicationId externalAppId = coreService.registerApplication(appName);
129
Shravan Ambati5a11e172016-07-21 15:55:28 -0700130 EventSubscriberGroupId id =
131 registeredApps.computeIfAbsent(externalAppId,
132 (key) -> new EventSubscriberGroupId(UUID
133 .randomUUID()));
134
135 RegistrationResponse response = new RegistrationResponse(id,
136 kafkaConfigService.getConfigParams().getIpAddress(),
137 kafkaConfigService.getConfigParams().getPort());
138
139 return response;
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700140 }
141
142 @Override
143 public void unregisterListener(String appName) {
144 ApplicationId externalAppId =
145 checkNotNull(coreService.getAppId(appName));
146 registeredApps.remove(externalAppId);
147 }
148
149 @Override
150 public void subscribe(EventSubscriber subscriber)
151 throws InvalidGroupIdException, InvalidApplicationException {
152
153 checkNotNull(subscriber);
154
155 if (!registeredApplication(subscriber.appName())) {
156 throw new InvalidApplicationException("Application is not "
157 + "registered to make this request.");
158 }
159
Shravan Ambati5a11e172016-07-21 15:55:28 -0700160 if (!validGroupId(subscriber.subscriberGroupId(),
161 subscriber.appName())) {
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700162 throw new InvalidGroupIdException("Incorrect group id in the request");
163 }
164
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700165 // update internal state
166 List<EventSubscriber> subscriptionList =
167 subscriptions.get(subscriber.eventType());
168 if (subscriptionList == null) {
169 subscriptionList = new ArrayList<EventSubscriber>();
170 }
171 subscriptionList.add(subscriber);
172 subscriptions.put(subscriber.eventType(), subscriptionList);
173
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700174 log.info("Subscription for {} event by {} successful",
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700175 subscriber.eventType(), subscriber.appName());
176 }
177
178 /**
179 * Checks if the application has registered.
180 *
181 * @param appName application name
182 * @return true if application has registered
183 */
184 private boolean registeredApplication(String appName) {
185
186 checkNotNull(appName);
187 ApplicationId appId = checkNotNull(coreService.getAppId(appName));
188 if (registeredApps.containsKey(appId)) {
189 return true;
190 }
191
192 log.debug("{} is not registered", appName);
193 return false;
194 }
195
196 /**
197 * Actions that can be performed on the ONOS Event Listeners.
198 *
199 */
200 private enum ListenerAction {
201 START, STOP;
202 }
203
204 /**
205 * Applies the specified action on the Listener.
206 *
207 * @param eventType the ONOS Event type registered by the application
208 * @param onosListener ONOS event listener
209 * @param action to be performed on the listener
210 */
211 private void applyListenerAction(Type eventType,
212 OnosEventListener onosListener,
213 ListenerAction action) {
214 switch (eventType) {
215 case DEVICE:
216 if (action == ListenerAction.START) {
217 onosListener.startListener(DEVICE, deviceService);
218 } else {
219 onosListener.stopListener(DEVICE, deviceService);
220 }
221 break;
222 case LINK:
223 if (action == ListenerAction.START) {
224 onosListener.startListener(LINK, linkService);
225 } else {
226 onosListener.stopListener(LINK, linkService);
227 }
228 break;
229 default:
230 log.error("Cannot {} listener. Unsupported event type {} ",
231 action.toString(), eventType.toString());
232 }
233 }
234
235 /**
236 * Returns the ONOS event listener corresponding to the ONOS Event type.
237 *
238 * @param eventType ONOS event type
239 * @return ONOS event listener
240 */
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700241
242 /**
243 * Checks if the group id is valid for this registered application.
244 *
245 * @param groupId GroupId assigned to the subscriber
246 * @param appName Registered Application name
247 * @return true if valid groupId and false otherwise
248 */
249 private boolean validGroupId(EventSubscriberGroupId groupId,
250 String appName) {
251
252 checkNotNull(groupId);
253
254 ApplicationId appId = coreService.getAppId(appName);
255 EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
256 if (registeredGroupId.equals(groupId)) {
257 return true;
258 }
259
260 return false;
261 }
262
263 @Override
264 public void unsubscribe(EventSubscriber subscriber)
265 throws InvalidGroupIdException, InvalidApplicationException {
266
267 checkNotNull(subscriber);
268
269 if (!registeredApplication(subscriber.appName())) {
270 throw new InvalidApplicationException("Application is not "
271 + "registered to make this request.");
272 }
273
274 if (!validGroupId(subscriber.subscriberGroupId(),
275 subscriber.appName())) {
276 throw new InvalidGroupIdException("Incorrect group id in the request");
277 }
278
279 if (!eventSubscribed(subscriber)) {
280 log.error("No subscription to {} was found",
281 subscriber.eventType());
282 return;
283 }
284
285 // If this is the only subscriber listening for this event,
286 // stop the listener.
287 List<EventSubscriber> subscribers =
288 subscriptions.get(subscriber.eventType());
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700289
290 // update internal state.
291 subscribers.remove(subscriber);
292 subscriptions.put(subscriber.eventType(), subscribers);
293
294 log.info("Unsubscribed {} for {} events", subscriber.appName(),
295 subscriber.eventType());
296 }
297
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700298 @Override
299 public List<EventSubscriber> getEventSubscribers(Type type) {
300 return subscriptions.getOrDefault(type, ImmutableList.of());
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700301 }
302
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700303 /**
304 * Checks if the subscriber has already subscribed to the requested event
305 * type.
306 *
307 * @param subscriber the subscriber to a specific ONOS event
308 * @return true if subscriber has subscribed to the ONOS event
309 */
310 private boolean eventSubscribed(EventSubscriber subscriber) {
311
312 List<EventSubscriber> subscriberList =
313 subscriptions.get(subscriber.eventType());
314
315 if (subscriberList == null) {
316 return false;
317 }
318
319 return subscriberList.contains(subscriber);
320 }
321
322}