blob: ebbac6b817f5b75e2c99c1e2f58b28b84ec30b3d [file] [log] [blame]
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07001/**
Shravan Ambatibb6b4452016-05-04 13:25:28 -07002 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.impl;
17
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070018import com.google.common.collect.ImmutableList;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070019import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070027import org.onosproject.kafkaintegration.api.EventSubscriptionService;
28import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
29import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070030import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -070031import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070032import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
33import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
34import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
35import org.onosproject.kafkaintegration.listener.ListenerFactory;
36import org.onosproject.kafkaintegration.listener.OnosEventListener;
37import org.onosproject.net.device.DeviceService;
38import org.onosproject.net.link.LinkService;
39import org.onosproject.store.serializers.KryoNamespaces;
40import org.onosproject.store.service.Serializer;
41import org.onosproject.store.service.StorageService;
42import org.slf4j.Logger;
43import org.slf4j.LoggerFactory;
44
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070045import java.util.ArrayList;
46import java.util.List;
47import java.util.Map;
48import java.util.UUID;
49
50import static com.google.common.base.Preconditions.checkNotNull;
51import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
52import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
53
Shravan Ambatibb6b4452016-05-04 13:25:28 -070054/**
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070055 * Implementation of Event Subscription Manager.
Shravan Ambatibb6b4452016-05-04 13:25:28 -070056 *
57 */
58@Component(immediate = true)
59@Service
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070060public class EventSubscriptionManager implements EventSubscriptionService {
Shravan Ambatibb6b4452016-05-04 13:25:28 -070061
62 private final Logger log = LoggerFactory.getLogger(getClass());
63
64 // Stores the currently registered applications for event export service.
65 private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
66
67 private Map<Type, List<EventSubscriber>> subscriptions;
68
69 private static final String REGISTERED_APPS = "registered-applications";
70
71 private static final String SUBSCRIBED_APPS = "event-subscriptions";
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected DeviceService deviceService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected LinkService linkService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected CoreService coreService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected StorageService storageService;
84
85 private ApplicationId appId;
86
87 @Activate
88 protected void activate() {
89 appId = coreService
90 .registerApplication("org.onosproject.kafkaintegration");
91
92 registeredApps = storageService
93 .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
94 .withName(REGISTERED_APPS)
95 .withSerializer(Serializer.using(KryoNamespaces.API,
96 EventSubscriberGroupId.class,
97 UUID.class))
98 .build().asJavaMap();
99
100 subscriptions = storageService
101 .<Type, List<EventSubscriber>>consistentMapBuilder()
102 .withName(SUBSCRIBED_APPS)
103 .withSerializer(Serializer.using(KryoNamespaces.API,
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -0700104 EventSubscriber.class,
105 OnosEvent.class,
106 OnosEvent.Type.class,
107 DefaultEventSubscriber.class,
108 EventSubscriberGroupId.class,
109 UUID.class))
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700110 .build().asJavaMap();
111
112 log.info("Started");
113 }
114
115 @Deactivate
116 protected void deactivate() {
117 log.info("Stopped");
118 }
119
120 @Override
121 public EventSubscriberGroupId registerListener(String appName) {
122
123 // TODO: Remove it once ONOS provides a mechanism for external apps
124 // to register with the core service. See Jira - 4409
125 ApplicationId externalAppId = coreService.registerApplication(appName);
126
127 return registeredApps.computeIfAbsent(externalAppId,
128 (key) -> new EventSubscriberGroupId(UUID
129 .randomUUID()));
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700130 }
131
132 @Override
133 public void unregisterListener(String appName) {
134 ApplicationId externalAppId =
135 checkNotNull(coreService.getAppId(appName));
136 registeredApps.remove(externalAppId);
137 }
138
139 @Override
140 public void subscribe(EventSubscriber subscriber)
141 throws InvalidGroupIdException, InvalidApplicationException {
142
143 checkNotNull(subscriber);
144
145 if (!registeredApplication(subscriber.appName())) {
146 throw new InvalidApplicationException("Application is not "
147 + "registered to make this request.");
148 }
149
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700150 if (!validGroupId(subscriber.subscriberGroupId(), subscriber.appName())) {
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700151 throw new InvalidGroupIdException("Incorrect group id in the request");
152 }
153
154 OnosEventListener onosListener = getListener(subscriber.eventType());
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700155 checkNotNull(onosListener, "No listener for the supported event type - {}", subscriber.eventType());
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700156
157 applyListenerAction(subscriber.eventType(), onosListener,
158 ListenerAction.START);
159
160 // update internal state
161 List<EventSubscriber> subscriptionList =
162 subscriptions.get(subscriber.eventType());
163 if (subscriptionList == null) {
164 subscriptionList = new ArrayList<EventSubscriber>();
165 }
166 subscriptionList.add(subscriber);
167 subscriptions.put(subscriber.eventType(), subscriptionList);
168
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700169 log.info("Subscription for {} event by {} successful",
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700170 subscriber.eventType(), subscriber.appName());
171 }
172
173 /**
174 * Checks if the application has registered.
175 *
176 * @param appName application name
177 * @return true if application has registered
178 */
179 private boolean registeredApplication(String appName) {
180
181 checkNotNull(appName);
182 ApplicationId appId = checkNotNull(coreService.getAppId(appName));
183 if (registeredApps.containsKey(appId)) {
184 return true;
185 }
186
187 log.debug("{} is not registered", appName);
188 return false;
189 }
190
191 /**
192 * Actions that can be performed on the ONOS Event Listeners.
193 *
194 */
195 private enum ListenerAction {
196 START, STOP;
197 }
198
199 /**
200 * Applies the specified action on the Listener.
201 *
202 * @param eventType the ONOS Event type registered by the application
203 * @param onosListener ONOS event listener
204 * @param action to be performed on the listener
205 */
206 private void applyListenerAction(Type eventType,
207 OnosEventListener onosListener,
208 ListenerAction action) {
209 switch (eventType) {
210 case DEVICE:
211 if (action == ListenerAction.START) {
212 onosListener.startListener(DEVICE, deviceService);
213 } else {
214 onosListener.stopListener(DEVICE, deviceService);
215 }
216 break;
217 case LINK:
218 if (action == ListenerAction.START) {
219 onosListener.startListener(LINK, linkService);
220 } else {
221 onosListener.stopListener(LINK, linkService);
222 }
223 break;
224 default:
225 log.error("Cannot {} listener. Unsupported event type {} ",
226 action.toString(), eventType.toString());
227 }
228 }
229
230 /**
231 * Returns the ONOS event listener corresponding to the ONOS Event type.
232 *
233 * @param eventType ONOS event type
234 * @return ONOS event listener
235 */
236 private OnosEventListener getListener(Type eventType) {
237 checkNotNull(eventType);
238 ListenerFactory factory = ListenerFactory.getInstance();
239 OnosEventListener onosListener = factory.getListener(eventType);
240 return onosListener;
241 }
242
243 /**
244 * Checks if the group id is valid for this registered application.
245 *
246 * @param groupId GroupId assigned to the subscriber
247 * @param appName Registered Application name
248 * @return true if valid groupId and false otherwise
249 */
250 private boolean validGroupId(EventSubscriberGroupId groupId,
251 String appName) {
252
253 checkNotNull(groupId);
254
255 ApplicationId appId = coreService.getAppId(appName);
256 EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
257 if (registeredGroupId.equals(groupId)) {
258 return true;
259 }
260
261 return false;
262 }
263
264 @Override
265 public void unsubscribe(EventSubscriber subscriber)
266 throws InvalidGroupIdException, InvalidApplicationException {
267
268 checkNotNull(subscriber);
269
270 if (!registeredApplication(subscriber.appName())) {
271 throw new InvalidApplicationException("Application is not "
272 + "registered to make this request.");
273 }
274
275 if (!validGroupId(subscriber.subscriberGroupId(),
276 subscriber.appName())) {
277 throw new InvalidGroupIdException("Incorrect group id in the request");
278 }
279
280 if (!eventSubscribed(subscriber)) {
281 log.error("No subscription to {} was found",
282 subscriber.eventType());
283 return;
284 }
285
286 // If this is the only subscriber listening for this event,
287 // stop the listener.
288 List<EventSubscriber> subscribers =
289 subscriptions.get(subscriber.eventType());
290 if (subscribers.size() == 1) {
291 OnosEventListener onosListener =
292 getListener(subscriber.eventType());
293 checkNotNull(onosListener,
294 "No listener for the supported event type - {}",
295 subscriber.eventType());
296 applyListenerAction(subscriber.eventType(), onosListener,
297 ListenerAction.STOP);
298 }
299
300 // update internal state.
301 subscribers.remove(subscriber);
302 subscriptions.put(subscriber.eventType(), subscribers);
303
304 log.info("Unsubscribed {} for {} events", subscriber.appName(),
305 subscriber.eventType());
306 }
307
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700308 @Override
309 public List<EventSubscriber> getEventSubscribers(Type type) {
310 return subscriptions.getOrDefault(type, ImmutableList.of());
311
312 }
313
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700314 /**
315 * Checks if the subscriber has already subscribed to the requested event
316 * type.
317 *
318 * @param subscriber the subscriber to a specific ONOS event
319 * @return true if subscriber has subscribed to the ONOS event
320 */
321 private boolean eventSubscribed(EventSubscriber subscriber) {
322
323 List<EventSubscriber> subscriberList =
324 subscriptions.get(subscriber.eventType());
325
326 if (subscriberList == null) {
327 return false;
328 }
329
330 return subscriberList.contains(subscriber);
331 }
332
333}