Add JSON string serializer for kafka telemetry exporter

Change-Id: I054b10e8356c10d9b0a37fe83fbe0a5d6907ef07
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 4ddb4c1..2078b2f 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -15,19 +15,26 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.CodecService;
+import org.onosproject.codec.JsonCodec;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.LinkInfo;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.TelemetryCodec;
 import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
 import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.rest.AbstractWebResource;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -58,13 +65,15 @@
 import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
 import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.Status.ENABLED;
 import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
+import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.flowsToLinks;
 import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.testConnectivity;
 
 /**
  * Kafka telemetry manager.
  */
 @Component(immediate = true, service = KafkaTelemetryAdminService.class)
-public class KafkaTelemetryManager implements KafkaTelemetryAdminService {
+public class KafkaTelemetryManager extends AbstractWebResource
+        implements KafkaTelemetryAdminService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -73,13 +82,24 @@
     private static final int RETRY_BACKOFF_MS_VAL = 10000;
     private static final int RECONNECT_BACKOFF_MS_VAL = 10000;
 
+    private static final String LINK_INFOS = "linkInfos";
+
+    private static final String BYTE_ARRAY_SERIALIZER =
+            "org.apache.kafka.common.serialization.ByteArraySerializer";
+    private static final String STRING_SERIALIZER =
+            "org.apache.kafka.common.serialization.StringSerializer";
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CodecService codecService;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected TelemetryConfigService telemetryConfigService;
 
-    private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
+    private Map<String, Producer<String, String>> stringProducers = Maps.newConcurrentMap();
+    private Map<String, Producer<String, byte[]>> byteProducers = Maps.newConcurrentMap();
 
     @Activate
     protected void activate() {
@@ -101,27 +121,59 @@
     @Override
     public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
 
-        if (producers == null || producers.isEmpty()) {
+        boolean hasEmptyByteProducers =
+                byteProducers == null || byteProducers.isEmpty();
+        boolean hasEmptyStringProducers =
+                stringProducers == null || stringProducers.isEmpty();
+
+        if (hasEmptyByteProducers && hasEmptyStringProducers) {
             log.debug("Kafka telemetry service has not been enabled!");
             return null;
         }
 
         log.debug("Send telemetry record to kafka server...");
         Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
-        producers.forEach((k, v) -> {
+
+        byteProducers.forEach((k, v) -> {
             TelemetryConfig config = telemetryConfigService.getConfig(k);
             KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
 
-            try {
-                Class codecClazz = Class.forName(kafkaConfig.codec());
-                TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+            if (kafkaConfig != null &&
+                    BYTE_ARRAY_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+                try {
+                    Class codecClazz = Class.forName(kafkaConfig.codec());
+                    TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
 
-                ByteBuffer buffer = codec.encode(flowInfos);
-                ProducerRecord record = new ProducerRecord<>(
-                        kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
-                futureSet.add(v.send(record));
-            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
-                log.warn("Failed to send telemetry record due to {}", e);
+                    ByteBuffer buffer = codec.encode(flowInfos);
+                    ProducerRecord record = new ProducerRecord<>(
+                            kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+                    futureSet.add(v.send(record));
+                } catch (ClassNotFoundException |
+                        IllegalAccessException | InstantiationException e) {
+                    log.warn("Failed to send telemetry record due to {}", e);
+                }
+            }
+        });
+
+        stringProducers.forEach((k, v) -> {
+            TelemetryConfig config = telemetryConfigService.getConfig(k);
+            KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+
+            if (kafkaConfig != null &&
+                    STRING_SERIALIZER.equals(kafkaConfig.valueSerializer())) {
+
+                // TODO: this is a workaround to convert flowInfo to linkInfo
+                // need to find a better solution
+
+                Set<LinkInfo> linkInfos = flowsToLinks(flowInfos);
+
+                if (!linkInfos.isEmpty()) {
+                    ProducerRecord record = new ProducerRecord<>(
+                            kafkaConfig.topic(), kafkaConfig.key(),
+                            encodeStrings(linkInfos, this,
+                                    kafkaConfig.codec()).toString());
+                    futureSet.add(v.send(record));
+                }
             }
         });
         return futureSet;
@@ -129,7 +181,7 @@
 
     @Override
     public boolean isRunning() {
-        return !producers.isEmpty();
+        return !byteProducers.isEmpty();
     }
 
     @Override
@@ -161,7 +213,14 @@
             prop.put(RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MS_VAL);
 
             if (testConnectivity(kafkaConfig.address(), kafkaConfig.port())) {
-                producers.put(name, new KafkaProducer<>(prop));
+                if (kafkaConfig.valueSerializer().equals(BYTE_ARRAY_SERIALIZER)) {
+                    byteProducers.put(name, new KafkaProducer<>(prop));
+                }
+
+                if (kafkaConfig.valueSerializer().equals(STRING_SERIALIZER)) {
+                    stringProducers.put(name, new KafkaProducer<>(prop));
+                }
+
                 success = true;
             } else {
                 log.warn("Unable to connect to {}:{}, " +
@@ -175,11 +234,17 @@
 
     @Override
     public void stop(String name) {
-        Producer<String, byte[]> producer = producers.get(name);
+        Producer<String, byte[]> byteProducer = byteProducers.get(name);
+        Producer<String, String> stringProducer = stringProducers.get(name);
 
-        if (producer != null) {
-            producer.close();
-            producers.remove(name);
+        if (byteProducer != null) {
+            byteProducer.close();
+            byteProducers.remove(name);
+        }
+
+        if (stringProducer != null) {
+            stringProducer.close();
+            stringProducers.remove(name);
         }
     }
 
@@ -197,11 +262,17 @@
 
     @Override
     public void stopAll() {
-        if (!producers.isEmpty()) {
-            producers.values().forEach(Producer::close);
+        if (!byteProducers.isEmpty()) {
+            byteProducers.values().forEach(Producer::close);
         }
 
-        producers.clear();
+        byteProducers.clear();
+
+        if (!stringProducers.isEmpty()) {
+            stringProducers.values().forEach(Producer::close);
+        }
+
+        stringProducers.clear();
 
         log.info("Kafka producer has Stopped");
     }
@@ -211,4 +282,21 @@
         stopAll();
         startAll();
     }
+
+    private ObjectNode encodeStrings(Set<LinkInfo> infos,
+                                     CodecContext context, String codecName) {
+        ObjectNode root = context.mapper().createObjectNode();
+        ArrayNode array = context.mapper().createArrayNode();
+        try {
+            Class codecClazz = Class.forName(codecName);
+            JsonCodec codec = codecService.getCodec(codecClazz);
+
+            infos.forEach(l -> array.add(codec.encode(l, context)));
+        } catch (ClassNotFoundException e) {
+            log.warn("Failed to send telemetry record due to {}", e);
+        }
+
+        root.set(LINK_INFOS, array);
+        return root;
+    }
 }