blob: a42e56a98f3244a4d0368541659b03a06e1c7f6e [file] [log] [blame]
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07001/**
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070016package org.onosproject.kafkaintegration.kafka;
17
Shravan Ambati5a11e172016-07-21 15:55:28 -070018import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Shravan Ambatia4875d82017-01-09 13:06:51 -080021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Shravan Ambati5a11e172016-07-21 15:55:28 -070023import org.apache.kafka.clients.producer.ProducerRecord;
Shravan Ambatia4875d82017-01-09 13:06:51 -080024import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.kafkaintegration.api.KafkaConfigService;
Shravan Ambati4c6295e2016-12-20 14:53:06 -080028import org.onosproject.kafkaintegration.api.KafkaPublisherService;
Shravan Ambatia4875d82017-01-09 13:06:51 -080029import org.onosproject.kafkaintegration.api.KafkaEventStorageService;
Shravan Ambati5a11e172016-07-21 15:55:28 -070030import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
Shravan Ambatia4875d82017-01-09 13:06:51 -080032import org.onosproject.kafkaintegration.api.dto.OnosEvent;
Shravan Ambati5a11e172016-07-21 15:55:28 -070033
Shravan Ambatia4875d82017-01-09 13:06:51 -080034import java.util.Objects;
35import java.util.concurrent.ExecutionException;
36import java.util.concurrent.ScheduledExecutorService;
37import java.util.concurrent.Executors;
38import java.util.concurrent.TimeUnit;
39
40@Component(immediate = true)
41public class EventPublisher {
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070042
43 private final Logger log = LoggerFactory.getLogger(getClass());
44
Shravan Ambatia4875d82017-01-09 13:06:51 -080045 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
46 protected LeadershipService leadershipService;
47
48 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
49 protected ClusterService clusterService;
50
51 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
52 protected KafkaConfigService kafkaConfigService;
53
54 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
55 protected KafkaEventStorageService kafkaStore;
56
57 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
58 protected KafkaPublisherService kafkaPublisher;
59
60 protected ScheduledExecutorService exService;
61
62 private static final String SUBSCRIBER_TOPIC = "WORK_QUEUE_SUBSCRIBER";
63
64 private NodeId localNodeId;
65
66 // Thread Scheduler Parameters
67 private final long delay = 0;
68 private final long period = 1;
69
70 private EventCollector eventCollector;
71
Shravan Ambati5a11e172016-07-21 15:55:28 -070072 @Activate
73 protected void activate() {
Shravan Ambatia4875d82017-01-09 13:06:51 -080074
75 leadershipService.runForLeadership(SUBSCRIBER_TOPIC);
76
77 localNodeId = clusterService.getLocalNode().id();
78
79 startCollector();
80
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -070081 log.info("Started");
82 }
83
Shravan Ambatia4875d82017-01-09 13:06:51 -080084 private void startCollector() {
85 exService = Executors.newSingleThreadScheduledExecutor();
86 eventCollector = new EventCollector();
87 exService.scheduleAtFixedRate(eventCollector, delay, period, TimeUnit.SECONDS);
88 }
89
Shravan Ambati5a11e172016-07-21 15:55:28 -070090 @Deactivate
91 protected void deactivate() {
Shravan Ambatia4875d82017-01-09 13:06:51 -080092 stopCollector();
Shravan Ambati5a11e172016-07-21 15:55:28 -070093 log.info("Stopped");
94 }
95
Shravan Ambatia4875d82017-01-09 13:06:51 -080096 private void stopCollector() {
97 exService.shutdown();
98 }
Shravan Ambati5a11e172016-07-21 15:55:28 -070099
Shravan Ambatia4875d82017-01-09 13:06:51 -0800100 private class EventCollector implements Runnable {
101
102 @Override
103 public void run() {
104
105 // do not allow to proceed without leadership
106 NodeId leaderNodeId = leadershipService.getLeader(SUBSCRIBER_TOPIC);
107 if (!Objects.equals(localNodeId, leaderNodeId)) {
108 log.debug("Not a Leader so cannot consume event");
109 return;
110 }
111
112 try {
113 OnosEvent onosEvent = kafkaStore.consumeEvent();
114
115 if (onosEvent != null) {
116 kafkaPublisher.send(new ProducerRecord<>(onosEvent.type().toString(),
117 onosEvent.subject())).get();
118
119 log.info("Event Type - {}, Subject {} sent successfully.",
120 onosEvent.type(), onosEvent.subject());
121 }
122 } catch (InterruptedException e1) {
123 log.error("Thread interupted");
124 Thread.currentThread().interrupt();
125 } catch (ExecutionException e2) {
126 log.error("Cannot publish data to Kafka - {}", e2);
127 }
Shravan Ambati5a11e172016-07-21 15:55:28 -0700128 }
Shravan Ambati5a11e172016-07-21 15:55:28 -0700129 }
130
Sanjana Agarwalcb4a3db2016-07-14 11:42:48 -0700131}