Refactor: move telemetry config from componentCfg to configs.xml

1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes

Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index e895e46..85f36e3 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -15,12 +15,17 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryCodec;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
 import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
@@ -31,9 +36,16 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Future;
 
+import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
+
 /**
  * Kafka telemetry manager.
  */
@@ -42,6 +54,8 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String CODEC_PREFIX = "org.onosproject.openstacktelemetry.codec.";
+
     private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
     private static final String RETRIES = "retries";
     private static final String ACKS = "acks";
@@ -54,7 +68,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
-    private Producer<String, byte[]> producer = null;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigService telemetryConfigService;
+
+    private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
 
     @Activate
     protected void activate() {
@@ -74,64 +91,83 @@
     }
 
     @Override
-    public void start(TelemetryConfig config) {
-        if (producer != null) {
-            log.info("Kafka producer has already been started");
-            return;
-        }
+    public void start() {
+        telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
+            KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
 
-        KafkaTelemetryConfig kafkaConfig = (KafkaTelemetryConfig) config;
+            if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
+                StringBuilder kafkaServerBuilder = new StringBuilder();
+                kafkaServerBuilder.append(kafkaConfig.address());
+                kafkaServerBuilder.append(":");
+                kafkaServerBuilder.append(kafkaConfig.port());
 
-        StringBuilder kafkaServerBuilder = new StringBuilder();
-        kafkaServerBuilder.append(kafkaConfig.address());
-        kafkaServerBuilder.append(":");
-        kafkaServerBuilder.append(kafkaConfig.port());
+                // Configure Kafka server properties
+                Properties prop = new Properties();
+                prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
+                prop.put(RETRIES, kafkaConfig.retries());
+                prop.put(ACKS, kafkaConfig.requiredAcks());
+                prop.put(BATCH_SIZE, kafkaConfig.batchSize());
+                prop.put(LINGER_MS, kafkaConfig.lingerMs());
+                prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
+                prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
+                prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
 
-        // Configure Kafka server properties
-        Properties prop = new Properties();
-        prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
-        prop.put(RETRIES, kafkaConfig.retries());
-        prop.put(ACKS, kafkaConfig.requiredAcks());
-        prop.put(BATCH_SIZE, kafkaConfig.batchSize());
-        prop.put(LINGER_MS, kafkaConfig.lingerMs());
-        prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
-        prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
-        prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
+                producers.put(c.name(), new KafkaProducer<>(prop));
+            }
+        });
 
-        producer = new KafkaProducer<>(prop);
         log.info("Kafka producer has Started");
     }
 
     @Override
     public void stop() {
-        if (producer != null) {
-            producer.close();
-            producer = null;
+        if (!producers.isEmpty()) {
+            producers.values().forEach(Producer::close);
         }
 
+        producers.clear();
+
         log.info("Kafka producer has Stopped");
     }
 
     @Override
-    public void restart(TelemetryConfig config) {
+    public void restart() {
         stop();
-        start(config);
+        start();
     }
 
     @Override
-    public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
+    public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
 
-        if (producer == null) {
+        if (producers == null || producers.isEmpty()) {
             log.debug("Kafka telemetry service has not been enabled!");
             return null;
         }
 
         log.debug("Send telemetry record to kafka server...");
-        return producer.send(record);
+        Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
+        producers.forEach((k, v) -> {
+            TelemetryConfig config = telemetryConfigService.getConfig(k);
+            KafkaTelemetryConfig kafkaConfig =
+                    fromTelemetryConfig(config);
+
+            try {
+                Class codecClazz = Class.forName(CODEC_PREFIX + kafkaConfig.codec());
+                TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+
+                ByteBuffer buffer = codec.encode(flowInfos);
+                ProducerRecord record = new ProducerRecord<>(
+                        kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+                futureSet.add(v.send(record));
+            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+                log.warn("Failed to send telemetry record due to {}", e);
+            }
+        });
+        return futureSet;
     }
 
     @Override
     public boolean isRunning() {
-        return producer != null;
+        return !producers.isEmpty();
     }
 }