blob: e935111fcf813063e1ea0952480fa455cb4d257d [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
21import org.apache.felix.scr.annotations.Service;
22import org.apache.kafka.clients.producer.KafkaProducer;
23import org.apache.kafka.clients.producer.Producer;
24import org.apache.kafka.clients.producer.ProducerRecord;
25import org.apache.kafka.clients.producer.RecordMetadata;
26import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
27import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
28import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
29import org.slf4j.Logger;
30import org.slf4j.LoggerFactory;
31
32import java.util.Properties;
33import java.util.concurrent.Future;
34
35/**
36 * Kafka telemetry manager.
37 */
38@Component(immediate = true)
39@Service
40public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
41
42 private final Logger log = LoggerFactory.getLogger(getClass());
43
44 private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
45 private static final String RETRIES = "retries";
46 private static final String ACKS = "acks";
47 private static final String BATCH_SIZE = "batch.size";
48 private static final String LINGER_MS = "linger.ms";
49 private static final String MEMORY_BUFFER = "buffer.memory";
50 private static final String KEY_SERIALIZER = "key.serializer";
51 private static final String VALUE_SERIALIZER = "value.serializer";
52
53 private Producer<String, byte[]> producer = null;
54
55 @Activate
56 protected void activate() {
57 log.info("Started");
58 }
59
60 @Deactivate
61 protected void deactivate() {
62 stop();
63 log.info("Stopped");
64 }
65
66 @Override
67 public void start(TelemetryConfig config) {
68 if (producer != null) {
69 log.info("Kafka producer has already been started");
70 return;
71 }
72
73 KafkaTelemetryConfig kafkaConfig = (KafkaTelemetryConfig) config;
74
75 StringBuilder kafkaServerBuilder = new StringBuilder();
76 kafkaServerBuilder.append(kafkaConfig.address());
77 kafkaServerBuilder.append(":");
78 kafkaServerBuilder.append(kafkaConfig.port());
79
80 // Configure Kafka server properties
81 Properties prop = new Properties();
82 prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
83 prop.put(RETRIES, kafkaConfig.retries());
84 prop.put(ACKS, kafkaConfig.requiredAcks());
85 prop.put(BATCH_SIZE, kafkaConfig.batchSize());
86 prop.put(LINGER_MS, kafkaConfig.lingerMs());
87 prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
88 prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
89 prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
90
91 producer = new KafkaProducer<>(prop);
92 log.info("Kafka producer has Started");
93 }
94
95 @Override
96 public void stop() {
97 if (producer != null) {
98 producer.close();
99 producer = null;
100 }
101
102 log.info("Kafka producer has Stopped");
103 }
104
105 @Override
106 public void restart(TelemetryConfig config) {
107 stop();
108 start(config);
109 }
110
111 @Override
112 public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
113 return producer.send(record);
114 }
115}