blob: 2078b2fa5d0e53b7294a5e6db53fc6434933c9e4 [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Lida8867f2019-01-31 01:17:36 +090018import com.fasterxml.jackson.databind.node.ArrayNode;
19import com.fasterxml.jackson.databind.node.ObjectNode;
Jian Li69600e02018-12-24 13:21:18 +090020import com.google.common.collect.Maps;
21import com.google.common.collect.Sets;
Jian Li6803ccd2018-06-08 09:26:09 +090022import org.apache.kafka.clients.producer.KafkaProducer;
23import org.apache.kafka.clients.producer.Producer;
24import org.apache.kafka.clients.producer.ProducerRecord;
25import org.apache.kafka.clients.producer.RecordMetadata;
Jian Lida8867f2019-01-31 01:17:36 +090026import org.onosproject.codec.CodecContext;
27import org.onosproject.codec.CodecService;
28import org.onosproject.codec.JsonCodec;
Jian Li69600e02018-12-24 13:21:18 +090029import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li6803ccd2018-06-08 09:26:09 +090030import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
Jian Lida8867f2019-01-31 01:17:36 +090031import org.onosproject.openstacktelemetry.api.LinkInfo;
Jian Lid1ce10a2018-06-12 13:47:23 +090032import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li69600e02018-12-24 13:21:18 +090033import org.onosproject.openstacktelemetry.api.TelemetryCodec;
34import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
Jian Li6803ccd2018-06-08 09:26:09 +090035import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
36import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Jian Lida8867f2019-01-31 01:17:36 +090037import org.onosproject.rest.AbstractWebResource;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070038import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090043import org.slf4j.Logger;
44import org.slf4j.LoggerFactory;
45
Jian Li69600e02018-12-24 13:21:18 +090046import java.nio.ByteBuffer;
47import java.util.Map;
Jian Li6803ccd2018-06-08 09:26:09 +090048import java.util.Properties;
Jian Li69600e02018-12-24 13:21:18 +090049import java.util.Set;
Jian Li6803ccd2018-06-08 09:26:09 +090050import java.util.concurrent.Future;
51
Jian Lib3b01682019-01-09 17:10:12 +090052import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
53import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
54import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
55import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
56import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
57import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
58import static org.apache.kafka.clients.producer.ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG;
59import static org.apache.kafka.clients.producer.ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG;
60import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
61import static org.apache.kafka.clients.producer.ProducerConfig.RETRY_BACKOFF_MS_CONFIG;
62import static org.apache.kafka.clients.producer.ProducerConfig.TIMEOUT_CONFIG;
63import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
Jian Li69600e02018-12-24 13:21:18 +090064import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
65import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
Jian Li667c6eb2019-01-07 23:01:12 +090066import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
Jian Li69600e02018-12-24 13:21:18 +090067import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
Jian Lida8867f2019-01-31 01:17:36 +090068import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.flowsToLinks;
Jian Li667c6eb2019-01-07 23:01:12 +090069import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
Jian Li69600e02018-12-24 13:21:18 +090070
Jian Li6803ccd2018-06-08 09:26:09 +090071/**
72 * Kafka telemetry manager.
73 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070074@Component(immediate = true, service = KafkaTelemetryAdminService.class)
Jian Lida8867f2019-01-31 01:17:36 +090075public class KafkaTelemetryManager extends AbstractWebResource
76 implements KafkaTelemetryAdminService {
Jian Li6803ccd2018-06-08 09:26:09 +090077
78 private final Logger log = LoggerFactory.getLogger(getClass());
79
Jian Lib3b01682019-01-09 17:10:12 +090080 private static final int METADATA_FETCH_TIMEOUT_VAL = 300;
81 private static final int TIMEOUT_VAL = 300;
82 private static final int RETRY_BACKOFF_MS_VAL = 10000;
83 private static final int RECONNECT_BACKOFF_MS_VAL = 10000;
Jian Li6803ccd2018-06-08 09:26:09 +090084
Jian Lida8867f2019-01-31 01:17:36 +090085 private static final String LINK_INFOS = "linkInfos";
86
87 private static final String BYTE_ARRAY_SERIALIZER =
88 "org.apache.kafka.common.serialization.ByteArraySerializer";
89 private static final String STRING_SERIALIZER =
90 "org.apache.kafka.common.serialization.StringSerializer";
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected CodecService codecService;
94
Ray Milkeyd84f89b2018-08-17 14:54:17 -070095 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lid1ce10a2018-06-12 13:47:23 +090096 protected OpenstackTelemetryService openstackTelemetryService;
97
Jian Li69600e02018-12-24 13:21:18 +090098 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected TelemetryConfigService telemetryConfigService;
100
Jian Lida8867f2019-01-31 01:17:36 +0900101 private Map<String, Producer<String, String>> stringProducers = Maps.newConcurrentMap();
102 private Map<String, Producer<String, byte[]>> byteProducers = Maps.newConcurrentMap();
Jian Li6803ccd2018-06-08 09:26:09 +0900103
104 @Activate
105 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +0900106
107 openstackTelemetryService.addTelemetryService(this);
108
Jian Li6803ccd2018-06-08 09:26:09 +0900109 log.info("Started");
110 }
111
112 @Deactivate
113 protected void deactivate() {
Jian Lia61e0b62018-12-28 19:10:10 +0900114 stopAll();
Jian Lid1ce10a2018-06-12 13:47:23 +0900115
116 openstackTelemetryService.removeTelemetryService(this);
117
Jian Li6803ccd2018-06-08 09:26:09 +0900118 log.info("Stopped");
119 }
120
121 @Override
Jian Li69600e02018-12-24 13:21:18 +0900122 public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
Jian Lid1ce10a2018-06-12 13:47:23 +0900123
Jian Lida8867f2019-01-31 01:17:36 +0900124 boolean hasEmptyByteProducers =
125 byteProducers == null || byteProducers.isEmpty();
126 boolean hasEmptyStringProducers =
127 stringProducers == null || stringProducers.isEmpty();
128
129 if (hasEmptyByteProducers && hasEmptyStringProducers) {
Jian Lia4947682018-07-07 14:53:32 +0900130 log.debug("Kafka telemetry service has not been enabled!");
Jian Lid1ce10a2018-06-12 13:47:23 +0900131 return null;
132 }
133
Jian Li85573f42018-06-27 22:29:14 +0900134 log.debug("Send telemetry record to kafka server...");
Jian Li69600e02018-12-24 13:21:18 +0900135 Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
Jian Lida8867f2019-01-31 01:17:36 +0900136
137 byteProducers.forEach((k, v) -> {
Jian Li69600e02018-12-24 13:21:18 +0900138 TelemetryConfig config = telemetryConfigService.getConfig(k);
Jian Li7fe7eaf2018-12-31 17:00:33 +0900139 KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
Jian Li69600e02018-12-24 13:21:18 +0900140
Jian Lida8867f2019-01-31 01:17:36 +0900141 if (kafkaConfig != null &&
142 BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
143 try {
144 Class codecClazz = Class.forName(kafkaConfig.codec());
145 TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
Jian Li69600e02018-12-24 13:21:18 +0900146
Jian Lida8867f2019-01-31 01:17:36 +0900147 ByteBuffer buffer = codec.encode(flowInfos);
148 ProducerRecord record = new ProducerRecord<>(
149 kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
150 futureSet.add(v.send(record));
151 } catch (ClassNotFoundException |
152 IllegalAccessException | InstantiationException e) {
153 log.warn("Failed to send telemetry record due to {}", e);
154 }
155 }
156 });
157
158 stringProducers.forEach((k, v) -> {
159 TelemetryConfig config = telemetryConfigService.getConfig(k);
160 KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
161
162 if (kafkaConfig != null &&
163 STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
164
165 // TODO: this is a workaround to convert flowInfo to linkInfo
166 // need to find a better solution
167
168 Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
169
170 if (!linkInfos.isEmpty()) {
171 ProducerRecord record = new ProducerRecord<>(
172 kafkaConfig.topic(), kafkaConfig.key(),
173 encodeStrings(linkInfos, this,
174 kafkaConfig.codec()).toString());
175 futureSet.add(v.send(record));
176 }
Jian Li69600e02018-12-24 13:21:18 +0900177 }
178 });
179 return futureSet;
Jian Li6803ccd2018-06-08 09:26:09 +0900180 }
Jian Lid1ce10a2018-06-12 13:47:23 +0900181
182 @Override
183 public boolean isRunning() {
Jian Lida8867f2019-01-31 01:17:36 +0900184 return !byteProducers.isEmpty();
Jian Lid1ce10a2018-06-12 13:47:23 +0900185 }
Jian Lia61e0b62018-12-28 19:10:10 +0900186
187 @Override
Jian Li667c6eb2019-01-07 23:01:12 +0900188 public boolean start(String name) {
189 boolean success = false;
Jian Lia61e0b62018-12-28 19:10:10 +0900190 TelemetryConfig config = telemetryConfigService.getConfig(name);
191 KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
192
Jian Li667c6eb2019-01-07 23:01:12 +0900193 if (kafkaConfig != null && !config.name().equals(KAFKA_SCHEME) &&
194 config.status() == ENABLED) {
Jian Lia61e0b62018-12-28 19:10:10 +0900195 StringBuilder kafkaServerBuilder = new StringBuilder();
196 kafkaServerBuilder.append(kafkaConfig.address());
197 kafkaServerBuilder.append(":");
198 kafkaServerBuilder.append(kafkaConfig.port());
199
200 // Configure Kafka server properties
201 Properties prop = new Properties();
Jian Lib3b01682019-01-09 17:10:12 +0900202 prop.put(BOOTSTRAP_SERVERS_CONFIG, kafkaServerBuilder.toString());
203 prop.put(RETRIES_CONFIG, kafkaConfig.retries());
204 prop.put(ACKS_CONFIG, kafkaConfig.requiredAcks());
205 prop.put(BATCH_SIZE_CONFIG, kafkaConfig.batchSize());
206 prop.put(LINGER_MS_CONFIG, kafkaConfig.lingerMs());
207 prop.put(BUFFER_MEMORY_CONFIG, kafkaConfig.memoryBuffer());
208 prop.put(KEY_SERIALIZER_CLASS_CONFIG, kafkaConfig.keySerializer());
209 prop.put(VALUE_SERIALIZER_CLASS_CONFIG, kafkaConfig.valueSerializer());
210 prop.put(METADATA_FETCH_TIMEOUT_CONFIG, METADATA_FETCH_TIMEOUT_VAL);
211 prop.put(TIMEOUT_CONFIG, TIMEOUT_VAL);
212 prop.put(RETRY_BACKOFF_MS_CONFIG, RETRY_BACKOFF_MS_VAL);
213 prop.put(RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS_VAL);
Jian Lia61e0b62018-12-28 19:10:10 +0900214
Jian Li667c6eb2019-01-07 23:01:12 +0900215 if (testConnectivity(kafkaConfig.address(), kafkaConfig.port())) {
Jian Lida8867f2019-01-31 01:17:36 +0900216 if (kafkaConfig.valueSerializer().equals(BYTE_ARRAY_SERIALIZER)) {
217 byteProducers.put(name, new KafkaProducer<>(prop));
218 }
219
220 if (kafkaConfig.valueSerializer().equals(STRING_SERIALIZER)) {
221 stringProducers.put(name, new KafkaProducer<>(prop));
222 }
223
Jian Li667c6eb2019-01-07 23:01:12 +0900224 success = true;
225 } else {
226 log.warn("Unable to connect to {}:{}, " +
227 "please check the connectivity manually",
228 kafkaConfig.address(), kafkaConfig.port());
229 }
Jian Lia61e0b62018-12-28 19:10:10 +0900230 }
Jian Li667c6eb2019-01-07 23:01:12 +0900231
232 return success;
Jian Lia61e0b62018-12-28 19:10:10 +0900233 }
234
235 @Override
236 public void stop(String name) {
Jian Lida8867f2019-01-31 01:17:36 +0900237 Producer<String, byte[]> byteProducer = byteProducers.get(name);
238 Producer<String, String> stringProducer = stringProducers.get(name);
Jian Lia61e0b62018-12-28 19:10:10 +0900239
Jian Lida8867f2019-01-31 01:17:36 +0900240 if (byteProducer != null) {
241 byteProducer.close();
242 byteProducers.remove(name);
243 }
244
245 if (stringProducer != null) {
246 stringProducer.close();
247 stringProducers.remove(name);
Jian Lia61e0b62018-12-28 19:10:10 +0900248 }
249 }
250
251 @Override
Jian Li667c6eb2019-01-07 23:01:12 +0900252 public boolean restart(String name) {
Jian Lia61e0b62018-12-28 19:10:10 +0900253 stop(name);
Jian Li667c6eb2019-01-07 23:01:12 +0900254 return start(name);
Jian Lia61e0b62018-12-28 19:10:10 +0900255 }
256
257 @Override
258 public void startAll() {
259 telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> start(c.name()));
260 log.info("Kafka producer has Started");
261 }
262
263 @Override
264 public void stopAll() {
Jian Lida8867f2019-01-31 01:17:36 +0900265 if (!byteProducers.isEmpty()) {
266 byteProducers.values().forEach(Producer::close);
Jian Lia61e0b62018-12-28 19:10:10 +0900267 }
268
Jian Lida8867f2019-01-31 01:17:36 +0900269 byteProducers.clear();
270
271 if (!stringProducers.isEmpty()) {
272 stringProducers.values().forEach(Producer::close);
273 }
274
275 stringProducers.clear();
Jian Lia61e0b62018-12-28 19:10:10 +0900276
277 log.info("Kafka producer has Stopped");
278 }
279
280 @Override
281 public void restartAll() {
282 stopAll();
283 startAll();
284 }
Jian Lida8867f2019-01-31 01:17:36 +0900285
286 private ObjectNode encodeStrings(Set<LinkInfo> infos,
287 CodecContext context, String codecName) {
288 ObjectNode root = context.mapper().createObjectNode();
289 ArrayNode array = context.mapper().createArrayNode();
290 try {
291 Class codecClazz = Class.forName(codecName);
292 JsonCodec codec = codecService.getCodec(codecClazz);
293
294 infos.forEach(l -> array.add(codec.encode(l, context)));
295 } catch (ClassNotFoundException e) {
296 log.warn("Failed to send telemetry record due to {}", e);
297 }
298
299 root.set(LINK_INFOS, array);
300 return root;
301 }
Jian Li6803ccd2018-06-08 09:26:09 +0900302}