blob: 18baccbfd37b07038c38738cabe474ed496a6059 [file] [log] [blame]
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07001/**
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.kafkaintegration.kafka;
18
Ray Milkeyd84f89b2018-08-17 14:54:17 -070019import org.osgi.service.component.annotations.Activate;
20import org.osgi.service.component.annotations.Component;
21import org.osgi.service.component.annotations.Deactivate;
22import org.osgi.service.component.annotations.Reference;
Shravan Ambatia4875d82017-01-09 13:06:51 -080023import org.onosproject.cluster.ClusterService;
24import org.onosproject.cluster.NodeId;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070025import org.osgi.service.component.annotations.ReferenceCardinality;
Shravan Ambatia4875d82017-01-09 13:06:51 -080026import org.onosproject.cluster.LeadershipService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070027import org.onosproject.kafkaintegration.api.EventConversionService;
28import org.onosproject.kafkaintegration.api.EventSubscriptionService;
Shravan Ambatia4875d82017-01-09 13:06:51 -080029import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070030import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070031import org.onosproject.net.device.DeviceEvent;
32import org.onosproject.net.device.DeviceListener;
33import org.onosproject.net.device.DeviceService;
34import org.onosproject.net.link.LinkEvent;
35import org.onosproject.net.link.LinkListener;
36import org.onosproject.net.link.LinkService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070037import org.slf4j.Logger;
38import org.slf4j.LoggerFactory;
39
Shravan Ambatia4875d82017-01-09 13:06:51 -080040import java.util.Objects;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070041import java.util.concurrent.ExecutorService;
42
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070043import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070044import static org.onlab.util.Tools.groupedThreads;
45import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
46import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
47
Shravan Ambati5a11e172016-07-21 15:55:28 -070048
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070049/**
50 * Encapsulates the behavior of monitoring various ONOS events.
51 * */
52@Component(immediate = true)
Shravan Ambati4c6295e2016-12-20 14:53:06 -080053public class EventListener {
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070054 private final Logger log = LoggerFactory.getLogger(getClass());
55
Ray Milkeyd84f89b2018-08-17 14:54:17 -070056 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070057 protected EventSubscriptionService eventSubscriptionService;
58
Ray Milkeyd84f89b2018-08-17 14:54:17 -070059 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070060 protected EventConversionService eventConversionService;
61
Ray Milkeyd84f89b2018-08-17 14:54:17 -070062 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070063 protected DeviceService deviceService;
64
Ray Milkeyd84f89b2018-08-17 14:54:17 -070065 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070066 protected LinkService linkService;
67
Ray Milkeyd84f89b2018-08-17 14:54:17 -070068 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatia4875d82017-01-09 13:06:51 -080069 protected KafkaEventStorageService kafkaStoreService;
70
Ray Milkeyd84f89b2018-08-17 14:54:17 -070071 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatia4875d82017-01-09 13:06:51 -080072 protected LeadershipService leadershipService;
73
Ray Milkeyd84f89b2018-08-17 14:54:17 -070074 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatia4875d82017-01-09 13:06:51 -080075 protected ClusterService clusterService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070076
77 private final DeviceListener deviceListener = new InternalDeviceListener();
78 private final LinkListener linkListener = new InternalLinkListener();
79
80 protected ExecutorService eventExecutor;
81
Shravan Ambatia4875d82017-01-09 13:06:51 -080082 private static final String PUBLISHER_TOPIC = "WORK_QUEUE_PUBLISHER";
83
84 private NodeId localNodeId;
85
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070086 @Activate
Shravan Ambati5a11e172016-07-21 15:55:28 -070087 protected void activate() {
88
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070089 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log));
90 deviceService.addListener(deviceListener);
91 linkService.addListener(linkListener);
Shravan Ambati5a11e172016-07-21 15:55:28 -070092
Shravan Ambatia4875d82017-01-09 13:06:51 -080093 localNodeId = clusterService.getLocalNode().id();
94
95 leadershipService.runForLeadership(PUBLISHER_TOPIC);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070096
97 log.info("Started");
98 }
99
100 @Deactivate
101 protected void deactivate() {
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700102 deviceService.removeListener(deviceListener);
103 linkService.removeListener(linkListener);
Shravan Ambati5a11e172016-07-21 15:55:28 -0700104
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700105 eventExecutor.shutdownNow();
106 eventExecutor = null;
107
108 log.info("Stopped");
109 }
110
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700111 private class InternalDeviceListener implements DeviceListener {
112
113 @Override
114 public void event(DeviceEvent event) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700115
Shravan Ambatia4875d82017-01-09 13:06:51 -0800116 // do not allow to proceed without leadership
117 NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
118 if (!Objects.equals(localNodeId, leaderNodeId)) {
119 log.debug("Not a Leader, cannot publish!");
120 return;
121 }
122
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700123 if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700124 OnosEvent onosEvent = eventConversionService.convertEvent(event);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700125 eventExecutor.execute(() -> {
Shravan Ambatia4875d82017-01-09 13:06:51 -0800126 kafkaStoreService.publishEvent(onosEvent);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700127 });
Shravan Ambatia4875d82017-01-09 13:06:51 -0800128 log.debug("Pushed event {} to kafka storage", onosEvent);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700129 }
Shravan Ambatia4875d82017-01-09 13:06:51 -0800130
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700131 }
132 }
133
134 private class InternalLinkListener implements LinkListener {
135
136 @Override
137 public void event(LinkEvent event) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700138
Shravan Ambatia4875d82017-01-09 13:06:51 -0800139 // do not allow to proceed without leadership
140 NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
141 if (!Objects.equals(localNodeId, leaderNodeId)) {
142 log.debug("Not a Leader, cannot publish!");
143 return;
144 }
145
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700146 if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700147 OnosEvent onosEvent = eventConversionService.convertEvent(event);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700148 eventExecutor.execute(() -> {
Shravan Ambatia4875d82017-01-09 13:06:51 -0800149 kafkaStoreService.publishEvent(onosEvent);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700150 });
Shravan Ambatia4875d82017-01-09 13:06:51 -0800151 log.debug("Pushed event {} to kafka storage", onosEvent);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700152 }
Shravan Ambatia4875d82017-01-09 13:06:51 -0800153
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700154 }
155 }
Shravan Ambatia4875d82017-01-09 13:06:51 -0800156}