blob: 4bc0450bd65d46b56a191729b06167ff01ed925b [file] [log] [blame]
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07001/**
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.kafkaintegration.kafka;
18
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070022import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.kafka.clients.producer.ProducerRecord;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070025import org.onosproject.kafkaintegration.api.EventConversionService;
26import org.onosproject.kafkaintegration.api.EventSubscriptionService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070027import org.onosproject.kafkaintegration.api.KafkaProducerService;
28import org.onosproject.kafkaintegration.api.KafkaConfigService;
29import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070030import org.onosproject.net.device.DeviceEvent;
31import org.onosproject.net.device.DeviceListener;
32import org.onosproject.net.device.DeviceService;
33import org.onosproject.net.link.LinkEvent;
34import org.onosproject.net.link.LinkListener;
35import org.onosproject.net.link.LinkService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070039import java.util.concurrent.ExecutionException;
40import java.util.concurrent.ExecutorService;
41
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070042import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070043import static org.onlab.util.Tools.groupedThreads;
44import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
45import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
46
Shravan Ambati5a11e172016-07-21 15:55:28 -070047
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070048/**
49 * Encapsulates the behavior of monitoring various ONOS events.
50 * */
51@Component(immediate = true)
52public class EventMonitor {
53 private final Logger log = LoggerFactory.getLogger(getClass());
54
55 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
56 protected EventSubscriptionService eventSubscriptionService;
57
58 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
59 protected EventConversionService eventConversionService;
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Shravan Ambati5a11e172016-07-21 15:55:28 -070062 protected KafkaProducerService kafkaProducer;
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070065 protected DeviceService deviceService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected LinkService linkService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Shravan Ambati5a11e172016-07-21 15:55:28 -070071 protected KafkaConfigService kafkaConfigService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070072
73 private final DeviceListener deviceListener = new InternalDeviceListener();
74 private final LinkListener linkListener = new InternalLinkListener();
75
76 protected ExecutorService eventExecutor;
77
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070078 @Activate
Shravan Ambati5a11e172016-07-21 15:55:28 -070079 protected void activate() {
80
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070081 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log));
82 deviceService.addListener(deviceListener);
83 linkService.addListener(linkListener);
Shravan Ambati5a11e172016-07-21 15:55:28 -070084
85 kafkaProducer.start(kafkaConfigService.getConfigParams());
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070086
87 log.info("Started");
88 }
89
90 @Deactivate
91 protected void deactivate() {
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070092 deviceService.removeListener(deviceListener);
93 linkService.removeListener(linkListener);
Shravan Ambati5a11e172016-07-21 15:55:28 -070094
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070095 eventExecutor.shutdownNow();
96 eventExecutor = null;
97
Shravan Ambati5a11e172016-07-21 15:55:28 -070098 kafkaProducer.stop();
99
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700100 log.info("Stopped");
101 }
102
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700103 private class InternalDeviceListener implements DeviceListener {
104
105 @Override
106 public void event(DeviceEvent event) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700107
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700108 if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700109 OnosEvent onosEvent = eventConversionService.convertEvent(event);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700110 eventExecutor.execute(() -> {
111 try {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700112 kafkaProducer.send(new ProducerRecord<>(DEVICE.toString(),
113 onosEvent.subject().toByteArray())).get();
114
115 log.debug("Event Type - {}, Subject {} sent successfully.",
116 DEVICE, onosEvent.subject());
117
118 } catch (InterruptedException e1) {
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700119 Thread.currentThread().interrupt();
Shravan Ambati5a11e172016-07-21 15:55:28 -0700120 } catch (ExecutionException e2) {
121 log.error("Exception thrown {}", e2);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700122 }
123 });
124 } else {
125 log.debug("No device listeners");
126 }
127 }
128 }
129
130 private class InternalLinkListener implements LinkListener {
131
132 @Override
133 public void event(LinkEvent event) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700134
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700135 if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700136 OnosEvent onosEvent = eventConversionService.convertEvent(event);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700137 eventExecutor.execute(() -> {
138 try {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700139 kafkaProducer.send(new ProducerRecord<>(LINK.toString(),
140 onosEvent.subject().toByteArray())).get();
141
142 log.debug("Event Type - {}, Subject {} sent successfully.",
143 LINK, onosEvent.subject());
144
145 } catch (InterruptedException e1) {
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700146 Thread.currentThread().interrupt();
Shravan Ambati5a11e172016-07-21 15:55:28 -0700147 } catch (ExecutionException e2) {
148 log.error("Exception thrown {}", e2);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700149 }
150 });
151 } else {
152 log.debug("No link listeners");
153 }
154 }
155 }
156}