blob: b2e0ad11436d734d4025d83b66d2b35a6786690a [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Lida8867f2019-01-31 01:17:36 +090018import com.fasterxml.jackson.databind.node.ArrayNode;
19import com.fasterxml.jackson.databind.node.ObjectNode;
Jian Li69600e02018-12-24 13:21:18 +090020import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Jian Li6803ccd2018-06-08 09:26:09 +090022import org.apache.kafka.clients.producer.KafkaProducer;
23import org.apache.kafka.clients.producer.Producer;
24import org.apache.kafka.clients.producer.ProducerRecord;
25import org.apache.kafka.clients.producer.RecordMetadata;
Jian Lida8867f2019-01-31 01:17:36 +090026import org.onosproject.codec.CodecContext;
27import org.onosproject.codec.CodecService;
28import org.onosproject.codec.JsonCodec;
Jian Li69600e02018-12-24 13:21:18 +090029import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li6803ccd2018-06-08 09:26:09 +090030import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lida8867f2019-01-31 01:17:36 +090031import org.onosproject.openstacktelemetry.api.LinkInfo;
Jian Lid1ce10a2018-06-12 13:47:23 +090032import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li69600e02018-12-24 13:21:18 +090033import org.onosproject.openstacktelemetry.api.TelemetryCodec;
34import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
Jian Li6803ccd2018-06-08 09:26:09 +090035import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
36import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Jian Lida8867f2019-01-31 01:17:36 +090037import org.onosproject.rest.AbstractWebResource;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070038import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090043import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
Jian Li69600e02018-12-24 13:21:18 +090046import java.nio.ByteBuffer;
47import java.util.Map;
Jian Li6803ccd2018-06-08 09:26:09 +090048import java.util.Properties;
Jian Li69600e02018-12-24 13:21:18 +090049import java.util.Set;
Jian Li6803ccd2018-06-08 09:26:09 +090050import java.util.concurrent.Future;
51
Jian Lib3b01682019-01-09 17:10:12 +090052import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
53import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
54import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
55import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
56import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
57import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
Daniele Moro73f0ae82020-02-03 22:49:13 -080058import static org.apache.kafka.clients.producer.ProducerConfig.MAX_BLOCK_MS_CONFIG;
Jian Lib3b01682019-01-09 17:10:12 +090059import static org.apache.kafka.clients.producer.ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG;
60import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
61import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
Daniele Moro73f0ae82020-02-03 22:49:13 -080062import static org.apache.kafka.clients.producer.ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG;
Jian Lib3b01682019-01-09 17:10:12 +090063import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
Jian Li69600e02018-12-24 13:21:18 +090064import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
65import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
Jian Li667c6eb2019-01-07 23:01:12 +090066import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
Jian Li69600e02018-12-24 13:21:18 +090067import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
Jian Lida8867f2019-01-31 01:17:36 +090068import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.flowsToLinks;
Jian Li667c6eb2019-01-07 23:01:12 +090069import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
Jian Li69600e02018-12-24 13:21:18 +090070
Jian Li6803ccd2018-06-08 09:26:09 +090071/**
72 * Kafka telemetry manager.
73 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070074@Component(immediate = true, service = KafkaTelemetryAdminService.class)
Jian Lida8867f2019-01-31 01:17:36 +090075public class KafkaTelemetryManager extends AbstractWebResource
76 implements KafkaTelemetryAdminService {
Jian Li6803ccd2018-06-08 09:26:09 +090077
78 private final Logger log = LoggerFactory.getLogger(getClass());
79
Jian Lib3b01682019-01-09 17:10:12 +090080 private static final int METADATA_FETCH_TIMEOUT_VAL = 300;
81 private static final int TIMEOUT_VAL = 300;
82 private static final int RETRY_BACKOFF_MS_VAL = 10000;
83 private static final int RECONNECT_BACKOFF_MS_VAL = 10000;
Jian Li6803ccd2018-06-08 09:26:09 +090084
Jian Lida8867f2019-01-31 01:17:36 +090085 private static final String LINK_INFOS = "linkInfos";
86
87 private static final String BYTE_ARRAY_SERIALIZER =
88 "org.apache.kafka.common.serialization.ByteArraySerializer";
89 private static final String STRING_SERIALIZER =
90 "org.apache.kafka.common.serialization.StringSerializer";
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected CodecService codecService;
94
Ray Milkeyd84f89b2018-08-17 14:54:17 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lid1ce10a2018-06-12 13:47:23 +090096 protected OpenstackTelemetryService openstackTelemetryService;
97
Jian Li69600e02018-12-24 13:21:18 +090098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected TelemetryConfigService telemetryConfigService;
100
Jian Lida8867f2019-01-31 01:17:36 +0900101 private Map<String, Producer<String, String>> stringProducers = Maps.newConcurrentMap();
102 private Map<String, Producer<String, byte[]>> byteProducers = Maps.newConcurrentMap();
Jian Li6803ccd2018-06-08 09:26:09 +0900103
104 @Activate
105 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +0900106
107 openstackTelemetryService.addTelemetryService(this);
108
Jian Li6803ccd2018-06-08 09:26:09 +0900109 log.info("Started");
110 }
111
112 @Deactivate
113 protected void deactivate() {
Jian Lia61e0b62018-12-28 19:10:10 +0900114 stopAll();
Jian Lid1ce10a2018-06-12 13:47:23 +0900115
116 openstackTelemetryService.removeTelemetryService(this);
117
Jian Li6803ccd2018-06-08 09:26:09 +0900118 log.info("Stopped");
119 }
120
121 @Override
Jian Li69600e02018-12-24 13:21:18 +0900122 public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
Jian Lid1ce10a2018-06-12 13:47:23 +0900123
Jian Li85573f42018-06-27 22:29:14 +0900124 log.debug("Send telemetry record to kafka server...");
Jian Li69600e02018-12-24 13:21:18 +0900125 Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
Jian Lida8867f2019-01-31 01:17:36 +0900126
Jian Li69d4fe92019-02-01 10:14:34 +0900127 if (byteProducers == null || byteProducers.isEmpty()) {
128 log.debug("Byte producer is empty!");
129 } else {
130 byteProducers.forEach((k, v) -> {
131 TelemetryConfig config = telemetryConfigService.getConfig(k);
132 KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
Jian Li69600e02018-12-24 13:21:18 +0900133
Jian Li69d4fe92019-02-01 10:14:34 +0900134 if (kafkaConfig != null &&
135 BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
136 try {
137 Class codecClazz = Class.forName(kafkaConfig.codec());
138 TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
Jian Li69600e02018-12-24 13:21:18 +0900139
Jian Li69d4fe92019-02-01 10:14:34 +0900140 ByteBuffer buffer = codec.encode(flowInfos);
141 ProducerRecord record = new ProducerRecord<>(
142 kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
143 futureSet.add(v.send(record));
144 } catch (ClassNotFoundException |
145 IllegalAccessException | InstantiationException e) {
146 log.warn("Failed to send telemetry record due to {}", e);
147 }
Jian Lida8867f2019-01-31 01:17:36 +0900148 }
Jian Li69d4fe92019-02-01 10:14:34 +0900149 });
150 }
Jian Lida8867f2019-01-31 01:17:36 +0900151
Jian Li69d4fe92019-02-01 10:14:34 +0900152 if (stringProducers == null || stringProducers.isEmpty()) {
153 log.debug("String producer is empty!");
154 } else {
155 stringProducers.forEach((k, v) -> {
156 TelemetryConfig config = telemetryConfigService.getConfig(k);
157 KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
Jian Lida8867f2019-01-31 01:17:36 +0900158
Jian Li69d4fe92019-02-01 10:14:34 +0900159 if (kafkaConfig != null &&
160 STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
Jian Lida8867f2019-01-31 01:17:36 +0900161
Jian Li69d4fe92019-02-01 10:14:34 +0900162 // TODO: this is a workaround to convert flowInfo to linkInfo
163 // need to find a better solution
Jian Lida8867f2019-01-31 01:17:36 +0900164
Jian Li69d4fe92019-02-01 10:14:34 +0900165 Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
Jian Lida8867f2019-01-31 01:17:36 +0900166
Jian Li69d4fe92019-02-01 10:14:34 +0900167 if (!linkInfos.isEmpty()) {
168 ProducerRecord record = new ProducerRecord<>(
169 kafkaConfig.topic(), kafkaConfig.key(),
170 encodeStrings(linkInfos, this,
171 kafkaConfig.codec()).toString());
172 futureSet.add(v.send(record));
173 }
Jian Lida8867f2019-01-31 01:17:36 +0900174 }
Jian Li69d4fe92019-02-01 10:14:34 +0900175 });
176 }
177
Jian Li69600e02018-12-24 13:21:18 +0900178 return futureSet;
Jian Li6803ccd2018-06-08 09:26:09 +0900179 }
Jian Lid1ce10a2018-06-12 13:47:23 +0900180
181 @Override
182 public boolean isRunning() {
Jian Lida8867f2019-01-31 01:17:36 +0900183 return !byteProducers.isEmpty();
Jian Lid1ce10a2018-06-12 13:47:23 +0900184 }
Jian Lia61e0b62018-12-28 19:10:10 +0900185
186 @Override
Jian Li667c6eb2019-01-07 23:01:12 +0900187 public boolean start(String name) {
188 boolean success = false;
Jian Lia61e0b62018-12-28 19:10:10 +0900189 TelemetryConfig config = telemetryConfigService.getConfig(name);
190 KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
191
Jian Li667c6eb2019-01-07 23:01:12 +0900192 if (kafkaConfig != null && !config.name().equals(KAFKA_SCHEME) &&
193 config.status() == ENABLED) {
Jian Lia61e0b62018-12-28 19:10:10 +0900194 StringBuilder kafkaServerBuilder = new StringBuilder();
195 kafkaServerBuilder.append(kafkaConfig.address());
196 kafkaServerBuilder.append(":");
197 kafkaServerBuilder.append(kafkaConfig.port());
198
199 // Configure Kafka server properties
200 Properties prop = new Properties();
Jian Lib3b01682019-01-09 17:10:12 +0900201 prop.put(BOOTSTRAP_SERVERS_CONFIG, kafkaServerBuilder.toString());
202 prop.put(RETRIES_CONFIG, kafkaConfig.retries());
203 prop.put(ACKS_CONFIG, kafkaConfig.requiredAcks());
204 prop.put(BATCH_SIZE_CONFIG, kafkaConfig.batchSize());
205 prop.put(LINGER_MS_CONFIG, kafkaConfig.lingerMs());
206 prop.put(BUFFER_MEMORY_CONFIG, kafkaConfig.memoryBuffer());
207 prop.put(KEY_SERIALIZER_CLASS_CONFIG, kafkaConfig.keySerializer());
208 prop.put(VALUE_SERIALIZER_CLASS_CONFIG, kafkaConfig.valueSerializer());
Daniele Moro73f0ae82020-02-03 22:49:13 -0800209 prop.put(MAX_BLOCK_MS_CONFIG, METADATA_FETCH_TIMEOUT_VAL);
210 prop.put(REQUEST_TIMEOUT_MS_CONFIG, TIMEOUT_VAL);
Jian Lib3b01682019-01-09 17:10:12 +0900211 prop.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS_VAL);
212 prop.put(RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS_VAL);
Jian Lia61e0b62018-12-28 19:10:10 +0900213
Jian Li667c6eb2019-01-07 23:01:12 +0900214 if (testConnectivity(kafkaConfig.address(), kafkaConfig.port())) {
Jian Lida8867f2019-01-31 01:17:36 +0900215 if (kafkaConfig.valueSerializer().equals(BYTE_ARRAY_SERIALIZER)) {
216 byteProducers.put(name, new KafkaProducer<>(prop));
217 }
218
219 if (kafkaConfig.valueSerializer().equals(STRING_SERIALIZER)) {
220 stringProducers.put(name, new KafkaProducer<>(prop));
221 }
222
Jian Li667c6eb2019-01-07 23:01:12 +0900223 success = true;
224 } else {
225 log.warn("Unable to connect to {}:{}, " +
226 "please check the connectivity manually",
227 kafkaConfig.address(), kafkaConfig.port());
228 }
Jian Lia61e0b62018-12-28 19:10:10 +0900229 }
Jian Li667c6eb2019-01-07 23:01:12 +0900230
231 return success;
Jian Lia61e0b62018-12-28 19:10:10 +0900232 }
233
234 @Override
235 public void stop(String name) {
Jian Lida8867f2019-01-31 01:17:36 +0900236 Producer<String, byte[]> byteProducer = byteProducers.get(name);
237 Producer<String, String> stringProducer = stringProducers.get(name);
Jian Lia61e0b62018-12-28 19:10:10 +0900238
Jian Lida8867f2019-01-31 01:17:36 +0900239 if (byteProducer != null) {
240 byteProducer.close();
241 byteProducers.remove(name);
242 }
243
244 if (stringProducer != null) {
245 stringProducer.close();
246 stringProducers.remove(name);
Jian Lia61e0b62018-12-28 19:10:10 +0900247 }
248 }
249
250 @Override
Jian Li667c6eb2019-01-07 23:01:12 +0900251 public boolean restart(String name) {
Jian Lia61e0b62018-12-28 19:10:10 +0900252 stop(name);
Jian Li667c6eb2019-01-07 23:01:12 +0900253 return start(name);
Jian Lia61e0b62018-12-28 19:10:10 +0900254 }
255
256 @Override
257 public void startAll() {
258 telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> start(c.name()));
259 log.info("Kafka producer has Started");
260 }
261
262 @Override
263 public void stopAll() {
Jian Lida8867f2019-01-31 01:17:36 +0900264 if (!byteProducers.isEmpty()) {
265 byteProducers.values().forEach(Producer::close);
Jian Lia61e0b62018-12-28 19:10:10 +0900266 }
267
Jian Lida8867f2019-01-31 01:17:36 +0900268 byteProducers.clear();
269
270 if (!stringProducers.isEmpty()) {
271 stringProducers.values().forEach(Producer::close);
272 }
273
274 stringProducers.clear();
Jian Lia61e0b62018-12-28 19:10:10 +0900275
276 log.info("Kafka producer has Stopped");
277 }
278
279 @Override
280 public void restartAll() {
281 stopAll();
282 startAll();
283 }
Jian Lida8867f2019-01-31 01:17:36 +0900284
285 private ObjectNode encodeStrings(Set<LinkInfo> infos,
286 CodecContext context, String codecName) {
287 ObjectNode root = context.mapper().createObjectNode();
288 ArrayNode array = context.mapper().createArrayNode();
289 try {
290 Class codecClazz = Class.forName(codecName);
291 JsonCodec codec = codecService.getCodec(codecClazz);
292
293 infos.forEach(l -> array.add(codec.encode(l, context)));
294 } catch (ClassNotFoundException e) {
295 log.warn("Failed to send telemetry record due to {}", e);
296 }
297
298 root.set(LINK_INFOS, array);
299 return root;
300 }
Jian Li6803ccd2018-06-08 09:26:09 +0900301}