blob: e895e469d58ccf817141f7cecd558b0b70302372 [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li6803ccd2018-06-08 09:26:09 +090018import org.apache.kafka.clients.producer.KafkaProducer;
19import org.apache.kafka.clients.producer.Producer;
20import org.apache.kafka.clients.producer.ProducerRecord;
21import org.apache.kafka.clients.producer.RecordMetadata;
22import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lid1ce10a2018-06-12 13:47:23 +090023import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li6803ccd2018-06-08 09:26:09 +090024import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
25import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070026import org.osgi.service.component.annotations.Activate;
27import org.osgi.service.component.annotations.Component;
28import org.osgi.service.component.annotations.Deactivate;
29import org.osgi.service.component.annotations.Reference;
30import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090031import org.slf4j.Logger;
32import org.slf4j.LoggerFactory;
33
34import java.util.Properties;
35import java.util.concurrent.Future;
36
37/**
38 * Kafka telemetry manager.
39 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070040@Component(immediate = true, service = KafkaTelemetryAdminService.class)
Jian Li6803ccd2018-06-08 09:26:09 +090041public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
42
43 private final Logger log = LoggerFactory.getLogger(getClass());
44
45 private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
46 private static final String RETRIES = "retries";
47 private static final String ACKS = "acks";
48 private static final String BATCH_SIZE = "batch.size";
49 private static final String LINGER_MS = "linger.ms";
50 private static final String MEMORY_BUFFER = "buffer.memory";
51 private static final String KEY_SERIALIZER = "key.serializer";
52 private static final String VALUE_SERIALIZER = "value.serializer";
53
Ray Milkeyd84f89b2018-08-17 14:54:17 -070054 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lid1ce10a2018-06-12 13:47:23 +090055 protected OpenstackTelemetryService openstackTelemetryService;
56
Jian Li6803ccd2018-06-08 09:26:09 +090057 private Producer<String, byte[]> producer = null;
58
59 @Activate
60 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +090061
62 openstackTelemetryService.addTelemetryService(this);
63
Jian Li6803ccd2018-06-08 09:26:09 +090064 log.info("Started");
65 }
66
67 @Deactivate
68 protected void deactivate() {
69 stop();
Jian Lid1ce10a2018-06-12 13:47:23 +090070
71 openstackTelemetryService.removeTelemetryService(this);
72
Jian Li6803ccd2018-06-08 09:26:09 +090073 log.info("Stopped");
74 }
75
76 @Override
77 public void start(TelemetryConfig config) {
78 if (producer != null) {
79 log.info("Kafka producer has already been started");
80 return;
81 }
82
83 KafkaTelemetryConfig kafkaConfig = (KafkaTelemetryConfig) config;
84
85 StringBuilder kafkaServerBuilder = new StringBuilder();
86 kafkaServerBuilder.append(kafkaConfig.address());
87 kafkaServerBuilder.append(":");
88 kafkaServerBuilder.append(kafkaConfig.port());
89
90 // Configure Kafka server properties
91 Properties prop = new Properties();
92 prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
93 prop.put(RETRIES, kafkaConfig.retries());
94 prop.put(ACKS, kafkaConfig.requiredAcks());
95 prop.put(BATCH_SIZE, kafkaConfig.batchSize());
96 prop.put(LINGER_MS, kafkaConfig.lingerMs());
97 prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
98 prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
99 prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
100
101 producer = new KafkaProducer<>(prop);
102 log.info("Kafka producer has Started");
103 }
104
105 @Override
106 public void stop() {
107 if (producer != null) {
108 producer.close();
109 producer = null;
110 }
111
112 log.info("Kafka producer has Stopped");
113 }
114
115 @Override
116 public void restart(TelemetryConfig config) {
117 stop();
118 start(config);
119 }
120
121 @Override
122 public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
Jian Lid1ce10a2018-06-12 13:47:23 +0900123
124 if (producer == null) {
Jian Lia4947682018-07-07 14:53:32 +0900125 log.debug("Kafka telemetry service has not been enabled!");
Jian Lid1ce10a2018-06-12 13:47:23 +0900126 return null;
127 }
128
Jian Li85573f42018-06-27 22:29:14 +0900129 log.debug("Send telemetry record to kafka server...");
Jian Li6803ccd2018-06-08 09:26:09 +0900130 return producer.send(record);
131 }
Jian Lid1ce10a2018-06-12 13:47:23 +0900132
133 @Override
134 public boolean isRunning() {
135 return producer != null;
136 }
Jian Li6803ccd2018-06-08 09:26:09 +0900137}