Refactor: move telemetry config from componentCfg to configs.xml

1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes

Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java
index d3958fe..ba9b34b 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java
@@ -15,17 +15,35 @@
  */
 package org.onosproject.openstacktelemetry.config;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.common.testing.EqualsTester;
 import org.junit.Before;
 import org.junit.Test;
 import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.impl.DefaultTelemetryConfig;
 
 import java.util.Map;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.ADDRESS;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.BATCH_SIZE;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.CODEC;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.KEY;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.KEY_SERIALIZER;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.LINGER_MS;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.MEMORY_BUFFER;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.PORT;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.REQUIRED_ACKS;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.RETRIES;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.TOPIC;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.VALUE_SERIALIZER;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
 
 public final class DefaultKafkaTelemetryConfigTest {
 
@@ -55,11 +73,22 @@
     private static final String VALUE_SERIALIZER_1 = "valueserializer1";
     private static final String VALUE_SERIALIZER_2 = "valueserializer2";
 
+    private static final String KEY_1 = "key1";
+    private static final String KEY_2 = "key2";
+
+    private static final String TOPIC_1 = "topic1";
+    private static final String TOPIC_2 = "topic2";
+
+    private static final String CODEC_1 = "codec1";
+    private static final String CODEC_2 = "codec2";
+
     private static final Map<String, Object> CONFIG_MAP_1 =
             ImmutableMap.of("key1", "value1");
     private static final Map<String, Object> CONFIG_MAP_2 =
             ImmutableMap.of("key2", "value2");
 
+    private static final String DUMMY = "dummy";
+
     private KafkaTelemetryConfig config1;
     private KafkaTelemetryConfig sameAsConfig1;
     private KafkaTelemetryConfig config2;
@@ -87,6 +116,9 @@
                 .withLingerMs(LINGER_MS_1)
                 .withKeySerializer(KEY_SERIALIZER_1)
                 .withValueSerializer(VALUE_SERIALIZER_1)
+                .withKey(KEY_1)
+                .withTopic(TOPIC_1)
+                .withCodec(CODEC_1)
                 .withConfigMap(CONFIG_MAP_1)
                 .build();
 
@@ -100,6 +132,9 @@
                 .withLingerMs(LINGER_MS_1)
                 .withKeySerializer(KEY_SERIALIZER_1)
                 .withValueSerializer(VALUE_SERIALIZER_1)
+                .withKey(KEY_1)
+                .withTopic(TOPIC_1)
+                .withCodec(CODEC_1)
                 .withConfigMap(CONFIG_MAP_1)
                 .build();
 
@@ -113,6 +148,9 @@
                 .withLingerMs(LINGER_MS_2)
                 .withKeySerializer(KEY_SERIALIZER_2)
                 .withValueSerializer(VALUE_SERIALIZER_2)
+                .withKey(KEY_2)
+                .withTopic(TOPIC_2)
+                .withCodec(CODEC_2)
                 .withConfigMap(CONFIG_MAP_2)
                 .build();
     }
@@ -151,6 +189,46 @@
         assertThat(config.lingerMs(), is(LINGER_MS_1));
         assertThat(config.keySerializer(), is(KEY_SERIALIZER_1));
         assertThat(config.valueSerializer(), is(VALUE_SERIALIZER_1));
+        assertThat(config.key(), is(KEY_1));
+        assertThat(config.topic(), is(TOPIC_1));
+        assertThat(config.codec(), is(CODEC_1));
         assertThat(config.configMap(), is(CONFIG_MAP_1));
     }
-}
+
+    /**
+     * Tests props extraction.
+     */
+    @Test
+    public void testPropsExtraction() {
+        Map<String, String> props = Maps.newConcurrentMap();
+        props.put(ADDRESS, IP_ADDRESS_1);
+        props.put(PORT, String.valueOf(PORT_1));
+        props.put(RETRIES, String.valueOf(RETRIES_1));
+        props.put(BATCH_SIZE, String.valueOf(BATCH_SIZE_1));
+        props.put(MEMORY_BUFFER, String.valueOf(MEMORY_BUFFER_1));
+        props.put(REQUIRED_ACKS, REQUIRED_ACKS_1);
+        props.put(LINGER_MS, String.valueOf(LINGER_MS_1));
+        props.put(KEY_SERIALIZER, KEY_SERIALIZER_1);
+        props.put(VALUE_SERIALIZER, VALUE_SERIALIZER_1);
+        props.put(KEY, KEY_1);
+        props.put(TOPIC, TOPIC_1);
+        props.put(CODEC, CODEC_1);
+
+        TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, KAFKA,
+                ImmutableList.of(), DUMMY, DUMMY, false, props);
+
+        KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+        assertThat(kafkaConfig.address(), is(IP_ADDRESS_1));
+        assertThat(kafkaConfig.port(), is(PORT_1));
+        assertThat(kafkaConfig.retries(), is(RETRIES_1));
+        assertThat(kafkaConfig.batchSize(), is(BATCH_SIZE_1));
+        assertThat(kafkaConfig.memoryBuffer(), is(MEMORY_BUFFER_1));
+        assertThat(kafkaConfig.requiredAcks(), is(REQUIRED_ACKS_1));
+        assertThat(kafkaConfig.lingerMs(), is(LINGER_MS_1));
+        assertThat(kafkaConfig.keySerializer(), is(KEY_SERIALIZER_1));
+        assertThat(kafkaConfig.valueSerializer(), is(VALUE_SERIALIZER_1));
+        assertThat(kafkaConfig.key(), is(KEY_1));
+        assertThat(kafkaConfig.topic(), is(TOPIC_1));
+        assertThat(kafkaConfig.codec(), is(CODEC_1));
+    }
+}
\ No newline at end of file