blob: 91e2028891ba80ca5c7cce86455c5846e3b25f7b [file] [log] [blame]
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07001/**
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.kafkaintegration.kafka;
18
Ray Milkeyd84f89b2018-08-17 14:54:17 -070019import org.osgi.service.component.annotations.Activate;
20import org.osgi.service.component.annotations.Component;
21import org.osgi.service.component.annotations.Deactivate;
22import org.osgi.service.component.annotations.Reference;
Shravan Ambatia4875d82017-01-09 13:06:51 -080023import org.onosproject.cluster.ClusterService;
24import org.onosproject.cluster.NodeId;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070025import org.osgi.service.component.annotations.ReferenceCardinality;
Shravan Ambatia4875d82017-01-09 13:06:51 -080026import org.onosproject.cluster.LeadershipService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070027import org.onosproject.kafkaintegration.api.EventConversionService;
28import org.onosproject.kafkaintegration.api.EventSubscriptionService;
Shravan Ambatia4875d82017-01-09 13:06:51 -080029import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070030import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070031import org.onosproject.net.device.DeviceEvent;
32import org.onosproject.net.device.DeviceListener;
33import org.onosproject.net.device.DeviceService;
34import org.onosproject.net.link.LinkEvent;
35import org.onosproject.net.link.LinkListener;
36import org.onosproject.net.link.LinkService;
Aaron Dunlapee83f262019-01-14 12:32:38 -060037import org.onosproject.net.host.HostEvent;
38import org.onosproject.net.host.HostListener;
39import org.onosproject.net.host.HostService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070040import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
Shravan Ambatia4875d82017-01-09 13:06:51 -080043import java.util.Objects;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070044import java.util.concurrent.ExecutorService;
45
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070046import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070047import static org.onlab.util.Tools.groupedThreads;
48import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.DEVICE;
49import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.LINK;
Aaron Dunlapee83f262019-01-14 12:32:38 -060050import static org.onosproject.kafkaintegration.api.dto.OnosEvent.Type.HOST;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070051
Shravan Ambati5a11e172016-07-21 15:55:28 -070052
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070053/**
54 * Encapsulates the behavior of monitoring various ONOS events.
55 * */
56@Component(immediate = true)
Shravan Ambati4c6295e2016-12-20 14:53:06 -080057public class EventListener {
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070058 private final Logger log = LoggerFactory.getLogger(getClass());
59
Ray Milkeyd84f89b2018-08-17 14:54:17 -070060 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070061 protected EventSubscriptionService eventSubscriptionService;
62
Ray Milkeyd84f89b2018-08-17 14:54:17 -070063 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070064 protected EventConversionService eventConversionService;
65
Ray Milkeyd84f89b2018-08-17 14:54:17 -070066 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070067 protected DeviceService deviceService;
68
Ray Milkeyd84f89b2018-08-17 14:54:17 -070069 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070070 protected LinkService linkService;
71
Ray Milkeyd84f89b2018-08-17 14:54:17 -070072 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Aaron Dunlapee83f262019-01-14 12:32:38 -060073 protected HostService hostService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatia4875d82017-01-09 13:06:51 -080076 protected KafkaEventStorageService kafkaStoreService;
77
Ray Milkeyd84f89b2018-08-17 14:54:17 -070078 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatia4875d82017-01-09 13:06:51 -080079 protected LeadershipService leadershipService;
80
Ray Milkeyd84f89b2018-08-17 14:54:17 -070081 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Shravan Ambatia4875d82017-01-09 13:06:51 -080082 protected ClusterService clusterService;
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070083
84 private final DeviceListener deviceListener = new InternalDeviceListener();
85 private final LinkListener linkListener = new InternalLinkListener();
Aaron Dunlapee83f262019-01-14 12:32:38 -060086 private final HostListener hostListener = new InternalHostListener();
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070087
88 protected ExecutorService eventExecutor;
89
Shravan Ambatia4875d82017-01-09 13:06:51 -080090 private static final String PUBLISHER_TOPIC = "WORK_QUEUE_PUBLISHER";
91
92 private NodeId localNodeId;
93
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070094 @Activate
Shravan Ambati5a11e172016-07-21 15:55:28 -070095 protected void activate() {
96
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070097 eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/onosEvents", "events-%d", log));
98 deviceService.addListener(deviceListener);
99 linkService.addListener(linkListener);
Aaron Dunlapee83f262019-01-14 12:32:38 -0600100 hostService.addListener(hostListener);
Shravan Ambati5a11e172016-07-21 15:55:28 -0700101
Shravan Ambatia4875d82017-01-09 13:06:51 -0800102 localNodeId = clusterService.getLocalNode().id();
103
104 leadershipService.runForLeadership(PUBLISHER_TOPIC);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700105
106 log.info("Started");
107 }
108
109 @Deactivate
110 protected void deactivate() {
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700111 deviceService.removeListener(deviceListener);
112 linkService.removeListener(linkListener);
Aaron Dunlapee83f262019-01-14 12:32:38 -0600113 hostService.removeListener(hostListener);
Shravan Ambati5a11e172016-07-21 15:55:28 -0700114
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700115 eventExecutor.shutdownNow();
116 eventExecutor = null;
117
118 log.info("Stopped");
119 }
120
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700121 private class InternalDeviceListener implements DeviceListener {
122
123 @Override
124 public void event(DeviceEvent event) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700125
Shravan Ambatia4875d82017-01-09 13:06:51 -0800126 // do not allow to proceed without leadership
127 NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
128 if (!Objects.equals(localNodeId, leaderNodeId)) {
129 log.debug("Not a Leader, cannot publish!");
130 return;
131 }
132
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700133 if (!eventSubscriptionService.getEventSubscribers(DEVICE).isEmpty()) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700134 OnosEvent onosEvent = eventConversionService.convertEvent(event);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700135 eventExecutor.execute(() -> {
Shravan Ambatia4875d82017-01-09 13:06:51 -0800136 kafkaStoreService.publishEvent(onosEvent);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700137 });
Shravan Ambatia4875d82017-01-09 13:06:51 -0800138 log.debug("Pushed event {} to kafka storage", onosEvent);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700139 }
Shravan Ambatia4875d82017-01-09 13:06:51 -0800140
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700141 }
142 }
143
144 private class InternalLinkListener implements LinkListener {
145
146 @Override
147 public void event(LinkEvent event) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700148
Shravan Ambatia4875d82017-01-09 13:06:51 -0800149 // do not allow to proceed without leadership
150 NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
151 if (!Objects.equals(localNodeId, leaderNodeId)) {
152 log.debug("Not a Leader, cannot publish!");
153 return;
154 }
155
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700156 if (!eventSubscriptionService.getEventSubscribers(LINK).isEmpty()) {
Shravan Ambati5a11e172016-07-21 15:55:28 -0700157 OnosEvent onosEvent = eventConversionService.convertEvent(event);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700158 eventExecutor.execute(() -> {
Shravan Ambatia4875d82017-01-09 13:06:51 -0800159 kafkaStoreService.publishEvent(onosEvent);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700160 });
Shravan Ambatia4875d82017-01-09 13:06:51 -0800161 log.debug("Pushed event {} to kafka storage", onosEvent);
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700162 }
Shravan Ambatia4875d82017-01-09 13:06:51 -0800163
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700164 }
165 }
Aaron Dunlapee83f262019-01-14 12:32:38 -0600166
167 private class InternalHostListener implements HostListener {
168
169 @Override
170 public void event(HostEvent event) {
171
172 // do not allow to proceed without leadership
173 NodeId leaderNodeId = leadershipService.getLeader(PUBLISHER_TOPIC);
174 if (!Objects.equals(localNodeId, leaderNodeId)) {
175 log.debug("Not a Leader, cannot publish!");
176 return;
177 }
178
179 if (!eventSubscriptionService.getEventSubscribers(HOST).isEmpty()) {
180 OnosEvent onosEvent = eventConversionService.convertEvent(event);
181 eventExecutor.execute(() -> {
182 kafkaStoreService.publishEvent(onosEvent);
183 });
184 log.debug("Pushed event {} to kafka storage", onosEvent);
185 }
186
187 }
188 }
Shravan Ambatia4875d82017-01-09 13:06:51 -0800189}