blob: e2e8e57c6c6a2128edf1692742a8cde46a4a461a [file] [log] [blame]
Shravan Ambatibb6b4452016-05-04 13:25:28 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.impl;
17
18import static com.google.common.base.Preconditions.checkNotNull;
19import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
20import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
21
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -070022import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
23import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
24
Shravan Ambatibb6b4452016-05-04 13:25:28 -070025import java.util.ArrayList;
26import java.util.List;
27import java.util.Map;
28import java.util.UUID;
29
30import org.apache.felix.scr.annotations.Activate;
31import org.apache.felix.scr.annotations.Component;
32import org.apache.felix.scr.annotations.Deactivate;
33import org.apache.felix.scr.annotations.Reference;
34import org.apache.felix.scr.annotations.ReferenceCardinality;
35import org.apache.felix.scr.annotations.Service;
36import org.onosproject.core.ApplicationId;
37import org.onosproject.core.CoreService;
38import org.onosproject.kafkaintegration.api.EventExporterService;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070039import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -070040import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070041import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
42import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
43import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
44import org.onosproject.kafkaintegration.listener.ListenerFactory;
45import org.onosproject.kafkaintegration.listener.OnosEventListener;
46import org.onosproject.net.device.DeviceService;
47import org.onosproject.net.link.LinkService;
48import org.onosproject.store.serializers.KryoNamespaces;
49import org.onosproject.store.service.Serializer;
50import org.onosproject.store.service.StorageService;
51import org.slf4j.Logger;
52import org.slf4j.LoggerFactory;
53
54/**
55 * Implementation of Event Exporter Service.
56 *
57 */
58@Component(immediate = true)
59@Service
60public class EventExporterManager implements EventExporterService {
61
62 private final Logger log = LoggerFactory.getLogger(getClass());
63
64 // Stores the currently registered applications for event export service.
65 private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
66
67 private Map<Type, List<EventSubscriber>> subscriptions;
68
69 private static final String REGISTERED_APPS = "registered-applications";
70
71 private static final String SUBSCRIBED_APPS = "event-subscriptions";
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected DeviceService deviceService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected LinkService linkService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected CoreService coreService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected StorageService storageService;
84
85 private ApplicationId appId;
86
87 @Activate
88 protected void activate() {
89 appId = coreService
90 .registerApplication("org.onosproject.kafkaintegration");
91
92 registeredApps = storageService
93 .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
94 .withName(REGISTERED_APPS)
95 .withSerializer(Serializer.using(KryoNamespaces.API,
96 EventSubscriberGroupId.class,
97 UUID.class))
98 .build().asJavaMap();
99
100 subscriptions = storageService
101 .<Type, List<EventSubscriber>>consistentMapBuilder()
102 .withName(SUBSCRIBED_APPS)
103 .withSerializer(Serializer.using(KryoNamespaces.API,
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -0700104 EventSubscriber.class,
105 OnosEvent.class,
106 OnosEvent.Type.class,
107 DefaultEventSubscriber.class,
108 EventSubscriberGroupId.class,
109 UUID.class))
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700110 .build().asJavaMap();
111
112 log.info("Started");
113 }
114
115 @Deactivate
116 protected void deactivate() {
117 log.info("Stopped");
118 }
119
120 @Override
121 public EventSubscriberGroupId registerListener(String appName) {
122
123 // TODO: Remove it once ONOS provides a mechanism for external apps
124 // to register with the core service. See Jira - 4409
125 ApplicationId externalAppId = coreService.registerApplication(appName);
126
127 return registeredApps.computeIfAbsent(externalAppId,
128 (key) -> new EventSubscriberGroupId(UUID
129 .randomUUID()));
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700130 }
131
132 @Override
133 public void unregisterListener(String appName) {
134 ApplicationId externalAppId =
135 checkNotNull(coreService.getAppId(appName));
136 registeredApps.remove(externalAppId);
137 }
138
139 @Override
140 public void subscribe(EventSubscriber subscriber)
141 throws InvalidGroupIdException, InvalidApplicationException {
142
143 checkNotNull(subscriber);
144
145 if (!registeredApplication(subscriber.appName())) {
146 throw new InvalidApplicationException("Application is not "
147 + "registered to make this request.");
148 }
149
150 if (!validGroupId(subscriber.subscriberGroupId(),
151 subscriber.appName())) {
152 throw new InvalidGroupIdException("Incorrect group id in the request");
153 }
154
155 OnosEventListener onosListener = getListener(subscriber.eventType());
156 checkNotNull(onosListener,
157 "No listener for the supported event type - {}",
158 subscriber.eventType());
159
160 applyListenerAction(subscriber.eventType(), onosListener,
161 ListenerAction.START);
162
163 // update internal state
164 List<EventSubscriber> subscriptionList =
165 subscriptions.get(subscriber.eventType());
166 if (subscriptionList == null) {
167 subscriptionList = new ArrayList<EventSubscriber>();
168 }
169 subscriptionList.add(subscriber);
170 subscriptions.put(subscriber.eventType(), subscriptionList);
171
172 log.info("Subscription for {} event by {} successfull",
173 subscriber.eventType(), subscriber.appName());
174 }
175
176 /**
177 * Checks if the application has registered.
178 *
179 * @param appName application name
180 * @return true if application has registered
181 */
182 private boolean registeredApplication(String appName) {
183
184 checkNotNull(appName);
185 ApplicationId appId = checkNotNull(coreService.getAppId(appName));
186 if (registeredApps.containsKey(appId)) {
187 return true;
188 }
189
190 log.debug("{} is not registered", appName);
191 return false;
192 }
193
194 /**
195 * Actions that can be performed on the ONOS Event Listeners.
196 *
197 */
198 private enum ListenerAction {
199 START, STOP;
200 }
201
202 /**
203 * Applies the specified action on the Listener.
204 *
205 * @param eventType the ONOS Event type registered by the application
206 * @param onosListener ONOS event listener
207 * @param action to be performed on the listener
208 */
209 private void applyListenerAction(Type eventType,
210 OnosEventListener onosListener,
211 ListenerAction action) {
212 switch (eventType) {
213 case DEVICE:
214 if (action == ListenerAction.START) {
215 onosListener.startListener(DEVICE, deviceService);
216 } else {
217 onosListener.stopListener(DEVICE, deviceService);
218 }
219 break;
220 case LINK:
221 if (action == ListenerAction.START) {
222 onosListener.startListener(LINK, linkService);
223 } else {
224 onosListener.stopListener(LINK, linkService);
225 }
226 break;
227 default:
228 log.error("Cannot {} listener. Unsupported event type {} ",
229 action.toString(), eventType.toString());
230 }
231 }
232
233 /**
234 * Returns the ONOS event listener corresponding to the ONOS Event type.
235 *
236 * @param eventType ONOS event type
237 * @return ONOS event listener
238 */
239 private OnosEventListener getListener(Type eventType) {
240 checkNotNull(eventType);
241 ListenerFactory factory = ListenerFactory.getInstance();
242 OnosEventListener onosListener = factory.getListener(eventType);
243 return onosListener;
244 }
245
246 /**
247 * Checks if the group id is valid for this registered application.
248 *
249 * @param groupId GroupId assigned to the subscriber
250 * @param appName Registered Application name
251 * @return true if valid groupId and false otherwise
252 */
253 private boolean validGroupId(EventSubscriberGroupId groupId,
254 String appName) {
255
256 checkNotNull(groupId);
257
258 ApplicationId appId = coreService.getAppId(appName);
259 EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
260 if (registeredGroupId.equals(groupId)) {
261 return true;
262 }
263
264 return false;
265 }
266
267 @Override
268 public void unsubscribe(EventSubscriber subscriber)
269 throws InvalidGroupIdException, InvalidApplicationException {
270
271 checkNotNull(subscriber);
272
273 if (!registeredApplication(subscriber.appName())) {
274 throw new InvalidApplicationException("Application is not "
275 + "registered to make this request.");
276 }
277
278 if (!validGroupId(subscriber.subscriberGroupId(),
279 subscriber.appName())) {
280 throw new InvalidGroupIdException("Incorrect group id in the request");
281 }
282
283 if (!eventSubscribed(subscriber)) {
284 log.error("No subscription to {} was found",
285 subscriber.eventType());
286 return;
287 }
288
289 // If this is the only subscriber listening for this event,
290 // stop the listener.
291 List<EventSubscriber> subscribers =
292 subscriptions.get(subscriber.eventType());
293 if (subscribers.size() == 1) {
294 OnosEventListener onosListener =
295 getListener(subscriber.eventType());
296 checkNotNull(onosListener,
297 "No listener for the supported event type - {}",
298 subscriber.eventType());
299 applyListenerAction(subscriber.eventType(), onosListener,
300 ListenerAction.STOP);
301 }
302
303 // update internal state.
304 subscribers.remove(subscriber);
305 subscriptions.put(subscriber.eventType(), subscribers);
306
307 log.info("Unsubscribed {} for {} events", subscriber.appName(),
308 subscriber.eventType());
309 }
310
311 /**
312 * Checks if the subscriber has already subscribed to the requested event
313 * type.
314 *
315 * @param subscriber the subscriber to a specific ONOS event
316 * @return true if subscriber has subscribed to the ONOS event
317 */
318 private boolean eventSubscribed(EventSubscriber subscriber) {
319
320 List<EventSubscriber> subscriberList =
321 subscriptions.get(subscriber.eventType());
322
323 if (subscriberList == null) {
324 return false;
325 }
326
327 return subscriberList.contains(subscriber);
328 }
329
330}