Add JSON string serializer for kafka telemetry exporter
Change-Id: I054b10e8356c10d9b0a37fe83fbe0a5d6907ef07
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 4ddb4c1..2078b2f 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -15,19 +15,26 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.CodecService;
+import org.onosproject.codec.JsonCodec;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.LinkInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.TelemetryCodec;
import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.rest.AbstractWebResource;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -58,13 +65,15 @@
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
+import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.flowsToLinks;
import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
/**
* Kafka telemetry manager.
*/
@Component(immediate = true, service = KafkaTelemetryAdminService.class)
-public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
+public class KafkaTelemetryManager extends AbstractWebResource
+ implements KafkaTelemetryAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -73,13 +82,24 @@
private static final int RETRY_BACKOFF_MS_VAL = 10000;
private static final int RECONNECT_BACKOFF_MS_VAL = 10000;
+ private static final String LINK_INFOS = "linkInfos";
+
+ private static final String BYTE_ARRAY_SERIALIZER =
+ "org.apache.kafka.common.serialization.ByteArraySerializer";
+ private static final String STRING_SERIALIZER =
+ "org.apache.kafka.common.serialization.StringSerializer";
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CodecService codecService;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected TelemetryConfigService telemetryConfigService;
- private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
+ private Map<String, Producer<String, String>> stringProducers = Maps.newConcurrentMap();
+ private Map<String, Producer<String, byte[]>> byteProducers = Maps.newConcurrentMap();
@Activate
protected void activate() {
@@ -101,27 +121,59 @@
@Override
public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
- if (producers == null || producers.isEmpty()) {
+ boolean hasEmptyByteProducers =
+ byteProducers == null || byteProducers.isEmpty();
+ boolean hasEmptyStringProducers =
+ stringProducers == null || stringProducers.isEmpty();
+
+ if (hasEmptyByteProducers && hasEmptyStringProducers) {
log.debug("Kafka telemetry service has not been enabled!");
return null;
}
log.debug("Send telemetry record to kafka server...");
Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
- producers.forEach((k, v) -> {
+
+ byteProducers.forEach((k, v) -> {
TelemetryConfig config = telemetryConfigService.getConfig(k);
KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
- try {
- Class codecClazz = Class.forName(kafkaConfig.codec());
- TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+ if (kafkaConfig != null &&
+ BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+ try {
+ Class codecClazz = Class.forName(kafkaConfig.codec());
+ TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
- ByteBuffer buffer = codec.encode(flowInfos);
- ProducerRecord record = new ProducerRecord<>(
- kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
- futureSet.add(v.send(record));
- } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
- log.warn("Failed to send telemetry record due to {}", e);
+ ByteBuffer buffer = codec.encode(flowInfos);
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+ futureSet.add(v.send(record));
+ } catch (ClassNotFoundException |
+ IllegalAccessException | InstantiationException e) {
+ log.warn("Failed to send telemetry record due to {}", e);
+ }
+ }
+ });
+
+ stringProducers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+
+ if (kafkaConfig != null &&
+ STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+
+ // TODO: this is a workaround to convert flowInfo to linkInfo
+ // need to find a better solution
+
+ Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
+
+ if (!linkInfos.isEmpty()) {
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaConfig.topic(), kafkaConfig.key(),
+ encodeStrings(linkInfos, this,
+ kafkaConfig.codec()).toString());
+ futureSet.add(v.send(record));
+ }
}
});
return futureSet;
@@ -129,7 +181,7 @@
@Override
public boolean isRunning() {
- return !producers.isEmpty();
+ return !byteProducers.isEmpty();
}
@Override
@@ -161,7 +213,14 @@
prop.put(RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS_VAL);
if (testConnectivity(kafkaConfig.address(), kafkaConfig.port())) {
- producers.put(name, new KafkaProducer<>(prop));
+ if (kafkaConfig.valueSerializer().equals(BYTE_ARRAY_SERIALIZER)) {
+ byteProducers.put(name, new KafkaProducer<>(prop));
+ }
+
+ if (kafkaConfig.valueSerializer().equals(STRING_SERIALIZER)) {
+ stringProducers.put(name, new KafkaProducer<>(prop));
+ }
+
success = true;
} else {
log.warn("Unable to connect to {}:{}, " +
@@ -175,11 +234,17 @@
@Override
public void stop(String name) {
- Producer<String, byte[]> producer = producers.get(name);
+ Producer<String, byte[]> byteProducer = byteProducers.get(name);
+ Producer<String, String> stringProducer = stringProducers.get(name);
- if (producer != null) {
- producer.close();
- producers.remove(name);
+ if (byteProducer != null) {
+ byteProducer.close();
+ byteProducers.remove(name);
+ }
+
+ if (stringProducer != null) {
+ stringProducer.close();
+ stringProducers.remove(name);
}
}
@@ -197,11 +262,17 @@
@Override
public void stopAll() {
- if (!producers.isEmpty()) {
- producers.values().forEach(Producer::close);
+ if (!byteProducers.isEmpty()) {
+ byteProducers.values().forEach(Producer::close);
}
- producers.clear();
+ byteProducers.clear();
+
+ if (!stringProducers.isEmpty()) {
+ stringProducers.values().forEach(Producer::close);
+ }
+
+ stringProducers.clear();
log.info("Kafka producer has Stopped");
}
@@ -211,4 +282,21 @@
stopAll();
startAll();
}
+
+ private ObjectNode encodeStrings(Set<LinkInfo> infos,
+ CodecContext context, String codecName) {
+ ObjectNode root = context.mapper().createObjectNode();
+ ArrayNode array = context.mapper().createArrayNode();
+ try {
+ Class codecClazz = Class.forName(codecName);
+ JsonCodec codec = codecService.getCodec(codecClazz);
+
+ infos.forEach(l -> array.add(codec.encode(l, context)));
+ } catch (ClassNotFoundException e) {
+ log.warn("Failed to send telemetry record due to {}", e);
+ }
+
+ root.set(LINK_INFOS, array);
+ return root;
+ }
}