Refactor: move telemetry config from componentCfg to configs.xml
1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes
Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index e895e46..85f36e3 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -15,12 +15,17 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryCodec;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
@@ -31,9 +36,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Future;
+import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
+
/**
* Kafka telemetry manager.
*/
@@ -42,6 +54,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String CODEC_PREFIX = "org.onosproject.openstacktelemetry.codec.";
+
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String RETRIES = "retries";
private static final String ACKS = "acks";
@@ -54,7 +68,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
- private Producer<String, byte[]> producer = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
+ private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
@Activate
protected void activate() {
@@ -74,64 +91,83 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (producer != null) {
- log.info("Kafka producer has already been started");
- return;
- }
+ public void start() {
+ telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
- KafkaTelemetryConfig kafkaConfig = (KafkaTelemetryConfig) config;
+ if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
+ StringBuilder kafkaServerBuilder = new StringBuilder();
+ kafkaServerBuilder.append(kafkaConfig.address());
+ kafkaServerBuilder.append(":");
+ kafkaServerBuilder.append(kafkaConfig.port());
- StringBuilder kafkaServerBuilder = new StringBuilder();
- kafkaServerBuilder.append(kafkaConfig.address());
- kafkaServerBuilder.append(":");
- kafkaServerBuilder.append(kafkaConfig.port());
+ // Configure Kafka server properties
+ Properties prop = new Properties();
+ prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
+ prop.put(RETRIES, kafkaConfig.retries());
+ prop.put(ACKS, kafkaConfig.requiredAcks());
+ prop.put(BATCH_SIZE, kafkaConfig.batchSize());
+ prop.put(LINGER_MS, kafkaConfig.lingerMs());
+ prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
+ prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
+ prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
- // Configure Kafka server properties
- Properties prop = new Properties();
- prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
- prop.put(RETRIES, kafkaConfig.retries());
- prop.put(ACKS, kafkaConfig.requiredAcks());
- prop.put(BATCH_SIZE, kafkaConfig.batchSize());
- prop.put(LINGER_MS, kafkaConfig.lingerMs());
- prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
- prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
- prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
+ producers.put(c.name(), new KafkaProducer<>(prop));
+ }
+ });
- producer = new KafkaProducer<>(prop);
log.info("Kafka producer has Started");
}
@Override
public void stop() {
- if (producer != null) {
- producer.close();
- producer = null;
+ if (!producers.isEmpty()) {
+ producers.values().forEach(Producer::close);
}
+ producers.clear();
+
log.info("Kafka producer has Stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
- public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
+ public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
- if (producer == null) {
+ if (producers == null || producers.isEmpty()) {
log.debug("Kafka telemetry service has not been enabled!");
return null;
}
log.debug("Send telemetry record to kafka server...");
- return producer.send(record);
+ Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
+ producers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ KafkaTelemetryConfig kafkaConfig =
+ fromTelemetryConfig(config);
+
+ try {
+ Class codecClazz = Class.forName(CODEC_PREFIX + kafkaConfig.codec());
+ TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+
+ ByteBuffer buffer = codec.encode(flowInfos);
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+ futureSet.add(v.send(record));
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ log.warn("Failed to send telemetry record due to {}", e);
+ }
+ });
+ return futureSet;
}
@Override
public boolean isRunning() {
- return producer != null;
+ return !producers.isEmpty();
}
}