blob: 6e1a0e84d0ba8e6cf567527b58cf67c75c614f74 [file] [log] [blame]
Shravan Ambatibb6b4452016-05-04 13:25:28 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
20import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
21
22import java.util.ArrayList;
23import java.util.List;
24import java.util.Map;
25import java.util.UUID;
26
27import org.apache.felix.scr.annotations.Activate;
28import org.apache.felix.scr.annotations.Component;
29import org.apache.felix.scr.annotations.Deactivate;
30import org.apache.felix.scr.annotations.Reference;
31import org.apache.felix.scr.annotations.ReferenceCardinality;
32import org.apache.felix.scr.annotations.Service;
33import org.onosproject.core.ApplicationId;
34import org.onosproject.core.CoreService;
35import org.onosproject.kafkaintegration.api.EventExporterService;
36import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
37import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
38import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
39import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
40import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
41import org.onosproject.kafkaintegration.listener.ListenerFactory;
42import org.onosproject.kafkaintegration.listener.OnosEventListener;
43import org.onosproject.net.device.DeviceService;
44import org.onosproject.net.link.LinkService;
45import org.onosproject.store.serializers.KryoNamespaces;
46import org.onosproject.store.service.Serializer;
47import org.onosproject.store.service.StorageService;
48import org.slf4j.Logger;
49import org.slf4j.LoggerFactory;
50
51/**
52 * Implementation of Event Exporter Service.
53 *
54 */
55@Component(immediate = true)
56@Service
57public class EventExporterManager implements EventExporterService {
58
59 private final Logger log = LoggerFactory.getLogger(getClass());
60
61 // Stores the currently registered applications for event export service.
62 private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
63
64 private Map<Type, List<EventSubscriber>> subscriptions;
65
66 private static final String REGISTERED_APPS = "registered-applications";
67
68 private static final String SUBSCRIBED_APPS = "event-subscriptions";
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected DeviceService deviceService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected LinkService linkService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected CoreService coreService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected StorageService storageService;
81
82 private ApplicationId appId;
83
84 @Activate
85 protected void activate() {
86 appId = coreService
87 .registerApplication("org.onosproject.kafkaintegration");
88
89 registeredApps = storageService
90 .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
91 .withName(REGISTERED_APPS)
92 .withSerializer(Serializer.using(KryoNamespaces.API,
93 EventSubscriberGroupId.class,
94 UUID.class))
95 .build().asJavaMap();
96
97 subscriptions = storageService
98 .<Type, List<EventSubscriber>>consistentMapBuilder()
99 .withName(SUBSCRIBED_APPS)
100 .withSerializer(Serializer.using(KryoNamespaces.API,
101 EventSubscriber.class))
102 .build().asJavaMap();
103
104 log.info("Started");
105 }
106
107 @Deactivate
108 protected void deactivate() {
109 log.info("Stopped");
110 }
111
112 @Override
113 public EventSubscriberGroupId registerListener(String appName) {
114
115 // TODO: Remove it once ONOS provides a mechanism for external apps
116 // to register with the core service. See Jira - 4409
117 ApplicationId externalAppId = coreService.registerApplication(appName);
118
119 return registeredApps.computeIfAbsent(externalAppId,
120 (key) -> new EventSubscriberGroupId(UUID
121 .randomUUID()));
122
123 }
124
125 @Override
126 public void unregisterListener(String appName) {
127 ApplicationId externalAppId =
128 checkNotNull(coreService.getAppId(appName));
129 registeredApps.remove(externalAppId);
130 }
131
132 @Override
133 public void subscribe(EventSubscriber subscriber)
134 throws InvalidGroupIdException, InvalidApplicationException {
135
136 checkNotNull(subscriber);
137
138 if (!registeredApplication(subscriber.appName())) {
139 throw new InvalidApplicationException("Application is not "
140 + "registered to make this request.");
141 }
142
143 if (!validGroupId(subscriber.subscriberGroupId(),
144 subscriber.appName())) {
145 throw new InvalidGroupIdException("Incorrect group id in the request");
146 }
147
148 OnosEventListener onosListener = getListener(subscriber.eventType());
149 checkNotNull(onosListener,
150 "No listener for the supported event type - {}",
151 subscriber.eventType());
152
153 applyListenerAction(subscriber.eventType(), onosListener,
154 ListenerAction.START);
155
156 // update internal state
157 List<EventSubscriber> subscriptionList =
158 subscriptions.get(subscriber.eventType());
159 if (subscriptionList == null) {
160 subscriptionList = new ArrayList<EventSubscriber>();
161 }
162 subscriptionList.add(subscriber);
163 subscriptions.put(subscriber.eventType(), subscriptionList);
164
165 log.info("Subscription for {} event by {} successfull",
166 subscriber.eventType(), subscriber.appName());
167 }
168
169 /**
170 * Checks if the application has registered.
171 *
172 * @param appName application name
173 * @return true if application has registered
174 */
175 private boolean registeredApplication(String appName) {
176
177 checkNotNull(appName);
178 ApplicationId appId = checkNotNull(coreService.getAppId(appName));
179 if (registeredApps.containsKey(appId)) {
180 return true;
181 }
182
183 log.debug("{} is not registered", appName);
184 return false;
185 }
186
187 /**
188 * Actions that can be performed on the ONOS Event Listeners.
189 *
190 */
191 private enum ListenerAction {
192 START, STOP;
193 }
194
195 /**
196 * Applies the specified action on the Listener.
197 *
198 * @param eventType the ONOS Event type registered by the application
199 * @param onosListener ONOS event listener
200 * @param action to be performed on the listener
201 */
202 private void applyListenerAction(Type eventType,
203 OnosEventListener onosListener,
204 ListenerAction action) {
205 switch (eventType) {
206 case DEVICE:
207 if (action == ListenerAction.START) {
208 onosListener.startListener(DEVICE, deviceService);
209 } else {
210 onosListener.stopListener(DEVICE, deviceService);
211 }
212 break;
213 case LINK:
214 if (action == ListenerAction.START) {
215 onosListener.startListener(LINK, linkService);
216 } else {
217 onosListener.stopListener(LINK, linkService);
218 }
219 break;
220 default:
221 log.error("Cannot {} listener. Unsupported event type {} ",
222 action.toString(), eventType.toString());
223 }
224 }
225
226 /**
227 * Returns the ONOS event listener corresponding to the ONOS Event type.
228 *
229 * @param eventType ONOS event type
230 * @return ONOS event listener
231 */
232 private OnosEventListener getListener(Type eventType) {
233 checkNotNull(eventType);
234 ListenerFactory factory = ListenerFactory.getInstance();
235 OnosEventListener onosListener = factory.getListener(eventType);
236 return onosListener;
237 }
238
239 /**
240 * Checks if the group id is valid for this registered application.
241 *
242 * @param groupId GroupId assigned to the subscriber
243 * @param appName Registered Application name
244 * @return true if valid groupId and false otherwise
245 */
246 private boolean validGroupId(EventSubscriberGroupId groupId,
247 String appName) {
248
249 checkNotNull(groupId);
250
251 ApplicationId appId = coreService.getAppId(appName);
252 EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
253 if (registeredGroupId.equals(groupId)) {
254 return true;
255 }
256
257 return false;
258 }
259
260 @Override
261 public void unsubscribe(EventSubscriber subscriber)
262 throws InvalidGroupIdException, InvalidApplicationException {
263
264 checkNotNull(subscriber);
265
266 if (!registeredApplication(subscriber.appName())) {
267 throw new InvalidApplicationException("Application is not "
268 + "registered to make this request.");
269 }
270
271 if (!validGroupId(subscriber.subscriberGroupId(),
272 subscriber.appName())) {
273 throw new InvalidGroupIdException("Incorrect group id in the request");
274 }
275
276 if (!eventSubscribed(subscriber)) {
277 log.error("No subscription to {} was found",
278 subscriber.eventType());
279 return;
280 }
281
282 // If this is the only subscriber listening for this event,
283 // stop the listener.
284 List<EventSubscriber> subscribers =
285 subscriptions.get(subscriber.eventType());
286 if (subscribers.size() == 1) {
287 OnosEventListener onosListener =
288 getListener(subscriber.eventType());
289 checkNotNull(onosListener,
290 "No listener for the supported event type - {}",
291 subscriber.eventType());
292 applyListenerAction(subscriber.eventType(), onosListener,
293 ListenerAction.STOP);
294 }
295
296 // update internal state.
297 subscribers.remove(subscriber);
298 subscriptions.put(subscriber.eventType(), subscribers);
299
300 log.info("Unsubscribed {} for {} events", subscriber.appName(),
301 subscriber.eventType());
302 }
303
304 /**
305 * Checks if the subscriber has already subscribed to the requested event
306 * type.
307 *
308 * @param subscriber the subscriber to a specific ONOS event
309 * @return true if subscriber has subscribed to the ONOS event
310 */
311 private boolean eventSubscribed(EventSubscriber subscriber) {
312
313 List<EventSubscriber> subscriberList =
314 subscriptions.get(subscriber.eventType());
315
316 if (subscriberList == null) {
317 return false;
318 }
319
320 return subscriberList.contains(subscriber);
321 }
322
323}