blob: d1de54a52c254d711e968ec941f193bd55f0f7e7 [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
18import org.apache.felix.scr.annotations.Activate;
19import org.apache.felix.scr.annotations.Component;
20import org.apache.felix.scr.annotations.Deactivate;
Jian Lid1ce10a2018-06-12 13:47:23 +090021import org.apache.felix.scr.annotations.Reference;
22import org.apache.felix.scr.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090023import org.apache.felix.scr.annotations.Service;
24import org.apache.kafka.clients.producer.KafkaProducer;
25import org.apache.kafka.clients.producer.Producer;
26import org.apache.kafka.clients.producer.ProducerRecord;
27import org.apache.kafka.clients.producer.RecordMetadata;
28import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lid1ce10a2018-06-12 13:47:23 +090029import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li6803ccd2018-06-08 09:26:09 +090030import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
31import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
32import org.slf4j.Logger;
33import org.slf4j.LoggerFactory;
34
35import java.util.Properties;
36import java.util.concurrent.Future;
37
38/**
39 * Kafka telemetry manager.
40 */
41@Component(immediate = true)
42@Service
43public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
44
45 private final Logger log = LoggerFactory.getLogger(getClass());
46
47 private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
48 private static final String RETRIES = "retries";
49 private static final String ACKS = "acks";
50 private static final String BATCH_SIZE = "batch.size";
51 private static final String LINGER_MS = "linger.ms";
52 private static final String MEMORY_BUFFER = "buffer.memory";
53 private static final String KEY_SERIALIZER = "key.serializer";
54 private static final String VALUE_SERIALIZER = "value.serializer";
55
Jian Lid1ce10a2018-06-12 13:47:23 +090056 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
57 protected OpenstackTelemetryService openstackTelemetryService;
58
Jian Li6803ccd2018-06-08 09:26:09 +090059 private Producer<String, byte[]> producer = null;
60
61 @Activate
62 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +090063
64 openstackTelemetryService.addTelemetryService(this);
65
Jian Li6803ccd2018-06-08 09:26:09 +090066 log.info("Started");
67 }
68
69 @Deactivate
70 protected void deactivate() {
71 stop();
Jian Lid1ce10a2018-06-12 13:47:23 +090072
73 openstackTelemetryService.removeTelemetryService(this);
74
Jian Li6803ccd2018-06-08 09:26:09 +090075 log.info("Stopped");
76 }
77
78 @Override
79 public void start(TelemetryConfig config) {
80 if (producer != null) {
81 log.info("Kafka producer has already been started");
82 return;
83 }
84
85 KafkaTelemetryConfig kafkaConfig = (KafkaTelemetryConfig) config;
86
87 StringBuilder kafkaServerBuilder = new StringBuilder();
88 kafkaServerBuilder.append(kafkaConfig.address());
89 kafkaServerBuilder.append(":");
90 kafkaServerBuilder.append(kafkaConfig.port());
91
92 // Configure Kafka server properties
93 Properties prop = new Properties();
94 prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
95 prop.put(RETRIES, kafkaConfig.retries());
96 prop.put(ACKS, kafkaConfig.requiredAcks());
97 prop.put(BATCH_SIZE, kafkaConfig.batchSize());
98 prop.put(LINGER_MS, kafkaConfig.lingerMs());
99 prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
100 prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
101 prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
102
103 producer = new KafkaProducer<>(prop);
104 log.info("Kafka producer has Started");
105 }
106
107 @Override
108 public void stop() {
109 if (producer != null) {
110 producer.close();
111 producer = null;
112 }
113
114 log.info("Kafka producer has Stopped");
115 }
116
117 @Override
118 public void restart(TelemetryConfig config) {
119 stop();
120 start(config);
121 }
122
123 @Override
124 public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
Jian Lid1ce10a2018-06-12 13:47:23 +0900125
126 if (producer == null) {
127 log.warn("Kafka telemetry service has not been enabled!");
128 return null;
129 }
130
Jian Li85573f42018-06-27 22:29:14 +0900131 log.debug("Send telemetry record to kafka server...");
Jian Li6803ccd2018-06-08 09:26:09 +0900132 return producer.send(record);
133 }
Jian Lid1ce10a2018-06-12 13:47:23 +0900134
135 @Override
136 public boolean isRunning() {
137 return producer != null;
138 }
Jian Li6803ccd2018-06-08 09:26:09 +0900139}