blob: 4fee5cb80b841d05d385d518baf0b47848b340b0 [file] [log] [blame]
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07001/**
Shravan Ambatibb6b4452016-05-04 13:25:28 -07002 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kafkaintegration.impl;
17
Shravan Ambati5a11e172016-07-21 15:55:28 -070018import static com.google.common.base.Preconditions.checkNotNull;
Shravan Ambati5a11e172016-07-21 15:55:28 -070019
20import java.util.ArrayList;
21import java.util.List;
22import java.util.Map;
23import java.util.UUID;
24
Shravan Ambatibb6b4452016-05-04 13:25:28 -070025import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
28import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
30import org.apache.felix.scr.annotations.Service;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070033import org.onosproject.kafkaintegration.api.EventSubscriptionService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070034import org.onosproject.kafkaintegration.api.KafkaConfigService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070035import org.onosproject.kafkaintegration.api.dto.DefaultEventSubscriber;
36import org.onosproject.kafkaintegration.api.dto.EventSubscriber;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070037import org.onosproject.kafkaintegration.api.dto.EventSubscriberGroupId;
Shravan Ambati5a11e172016-07-21 15:55:28 -070038import org.onosproject.kafkaintegration.api.dto.RegistrationResponse;
Sanjana Agarwaleb9f0c52016-06-07 11:10:34 -070039import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070040import org.onosproject.kafkaintegration.api.dto.OnosEvent.Type;
41import org.onosproject.kafkaintegration.errors.InvalidApplicationException;
42import org.onosproject.kafkaintegration.errors.InvalidGroupIdException;
Shravan Ambatibb6b4452016-05-04 13:25:28 -070043import org.onosproject.net.device.DeviceService;
44import org.onosproject.net.link.LinkService;
45import org.onosproject.store.serializers.KryoNamespaces;
46import org.onosproject.store.service.Serializer;
47import org.onosproject.store.service.StorageService;
48import org.slf4j.Logger;
49import org.slf4j.LoggerFactory;
50
Shravan Ambati5a11e172016-07-21 15:55:28 -070051import com.google.common.collect.ImmutableList;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070052
Shravan Ambatibb6b4452016-05-04 13:25:28 -070053/**
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070054 * Implementation of Event Subscription Manager.
Shravan Ambatibb6b4452016-05-04 13:25:28 -070055 *
56 */
57@Component(immediate = true)
58@Service
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070059public class EventSubscriptionManager implements EventSubscriptionService {
Shravan Ambatibb6b4452016-05-04 13:25:28 -070060
61 private final Logger log = LoggerFactory.getLogger(getClass());
62
63 // Stores the currently registered applications for event export service.
64 private Map<ApplicationId, EventSubscriberGroupId> registeredApps;
65
66 private Map<Type, List<EventSubscriber>> subscriptions;
67
68 private static final String REGISTERED_APPS = "registered-applications";
69
70 private static final String SUBSCRIBED_APPS = "event-subscriptions";
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected DeviceService deviceService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 protected LinkService linkService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 protected CoreService coreService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82 protected StorageService storageService;
83
Shravan Ambati5a11e172016-07-21 15:55:28 -070084 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected KafkaConfigService kafkaConfigService;
86
Shravan Ambatibb6b4452016-05-04 13:25:28 -070087 private ApplicationId appId;
88
89 @Activate
90 protected void activate() {
91 appId = coreService
92 .registerApplication("org.onosproject.kafkaintegration");
93
94 registeredApps = storageService
95 .<ApplicationId, EventSubscriberGroupId>consistentMapBuilder()
96 .withName(REGISTERED_APPS)
97 .withSerializer(Serializer.using(KryoNamespaces.API,
98 EventSubscriberGroupId.class,
99 UUID.class))
100 .build().asJavaMap();
101
102 subscriptions = storageService
103 .<Type, List<EventSubscriber>>consistentMapBuilder()
104 .withName(SUBSCRIBED_APPS)
Shravan Ambati5a11e172016-07-21 15:55:28 -0700105 .withSerializer(Serializer
106 .using(KryoNamespaces.API, EventSubscriber.class,
107 OnosEvent.class, OnosEvent.Type.class,
108 DefaultEventSubscriber.class,
109 EventSubscriberGroupId.class, UUID.class))
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700110 .build().asJavaMap();
111
112 log.info("Started");
113 }
114
115 @Deactivate
116 protected void deactivate() {
117 log.info("Stopped");
118 }
119
120 @Override
Shravan Ambati5a11e172016-07-21 15:55:28 -0700121 public RegistrationResponse registerListener(String appName) {
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700122
123 // TODO: Remove it once ONOS provides a mechanism for external apps
124 // to register with the core service. See Jira - 4409
125 ApplicationId externalAppId = coreService.registerApplication(appName);
126
Shravan Ambati5a11e172016-07-21 15:55:28 -0700127 EventSubscriberGroupId id =
128 registeredApps.computeIfAbsent(externalAppId,
129 (key) -> new EventSubscriberGroupId(UUID
130 .randomUUID()));
131
132 RegistrationResponse response = new RegistrationResponse(id,
133 kafkaConfigService.getConfigParams().getIpAddress(),
134 kafkaConfigService.getConfigParams().getPort());
135
136 return response;
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700137 }
138
139 @Override
140 public void unregisterListener(String appName) {
141 ApplicationId externalAppId =
142 checkNotNull(coreService.getAppId(appName));
143 registeredApps.remove(externalAppId);
144 }
145
146 @Override
147 public void subscribe(EventSubscriber subscriber)
148 throws InvalidGroupIdException, InvalidApplicationException {
149
150 checkNotNull(subscriber);
151
152 if (!registeredApplication(subscriber.appName())) {
153 throw new InvalidApplicationException("Application is not "
154 + "registered to make this request.");
155 }
156
Shravan Ambati5a11e172016-07-21 15:55:28 -0700157 if (!validGroupId(subscriber.subscriberGroupId(),
158 subscriber.appName())) {
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700159 throw new InvalidGroupIdException("Incorrect group id in the request");
160 }
161
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700162 // update internal state
163 List<EventSubscriber> subscriptionList =
164 subscriptions.get(subscriber.eventType());
165 if (subscriptionList == null) {
166 subscriptionList = new ArrayList<EventSubscriber>();
167 }
168 subscriptionList.add(subscriber);
169 subscriptions.put(subscriber.eventType(), subscriptionList);
170
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700171 log.info("Subscription for {} event by {} successful",
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700172 subscriber.eventType(), subscriber.appName());
173 }
174
175 /**
176 * Checks if the application has registered.
177 *
178 * @param appName application name
179 * @return true if application has registered
180 */
181 private boolean registeredApplication(String appName) {
182
183 checkNotNull(appName);
184 ApplicationId appId = checkNotNull(coreService.getAppId(appName));
185 if (registeredApps.containsKey(appId)) {
186 return true;
187 }
188
189 log.debug("{} is not registered", appName);
190 return false;
191 }
192
193 /**
194 * Actions that can be performed on the ONOS Event Listeners.
195 *
196 */
197 private enum ListenerAction {
198 START, STOP;
199 }
200
201 /**
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700202 * Checks if the group id is valid for this registered application.
203 *
204 * @param groupId GroupId assigned to the subscriber
205 * @param appName Registered Application name
206 * @return true if valid groupId and false otherwise
207 */
208 private boolean validGroupId(EventSubscriberGroupId groupId,
209 String appName) {
210
211 checkNotNull(groupId);
212
213 ApplicationId appId = coreService.getAppId(appName);
214 EventSubscriberGroupId registeredGroupId = registeredApps.get(appId);
215 if (registeredGroupId.equals(groupId)) {
216 return true;
217 }
218
219 return false;
220 }
221
222 @Override
223 public void unsubscribe(EventSubscriber subscriber)
224 throws InvalidGroupIdException, InvalidApplicationException {
225
226 checkNotNull(subscriber);
227
228 if (!registeredApplication(subscriber.appName())) {
229 throw new InvalidApplicationException("Application is not "
230 + "registered to make this request.");
231 }
232
233 if (!validGroupId(subscriber.subscriberGroupId(),
234 subscriber.appName())) {
235 throw new InvalidGroupIdException("Incorrect group id in the request");
236 }
237
238 if (!eventSubscribed(subscriber)) {
239 log.error("No subscription to {} was found",
240 subscriber.eventType());
241 return;
242 }
243
244 // If this is the only subscriber listening for this event,
245 // stop the listener.
246 List<EventSubscriber> subscribers =
247 subscriptions.get(subscriber.eventType());
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700248
249 // update internal state.
250 subscribers.remove(subscriber);
251 subscriptions.put(subscriber.eventType(), subscribers);
252
253 log.info("Unsubscribed {} for {} events", subscriber.appName(),
254 subscriber.eventType());
255 }
256
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700257 @Override
258 public List<EventSubscriber> getEventSubscribers(Type type) {
259 return subscriptions.getOrDefault(type, ImmutableList.of());
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700260 }
261
Shravan Ambatibb6b4452016-05-04 13:25:28 -0700262 /**
263 * Checks if the subscriber has already subscribed to the requested event
264 * type.
265 *
266 * @param subscriber the subscriber to a specific ONOS event
267 * @return true if subscriber has subscribed to the ONOS event
268 */
269 private boolean eventSubscribed(EventSubscriber subscriber) {
270
271 List<EventSubscriber> subscriberList =
272 subscriptions.get(subscriber.eventType());
273
274 if (subscriberList == null) {
275 return false;
276 }
277
278 return subscriberList.contains(subscriber);
279 }
280
281}