Refactor: move telemetry config from componentCfg to configs.xml
1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes
Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java
index d3958fe..ba9b34b 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java
@@ -15,17 +15,35 @@
*/
package org.onosproject.openstacktelemetry.config;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.testing.EqualsTester;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.impl.DefaultTelemetryConfig;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.ADDRESS;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.BATCH_SIZE;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.CODEC;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.KEY;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.KEY_SERIALIZER;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.LINGER_MS;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.MEMORY_BUFFER;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.PORT;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.REQUIRED_ACKS;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.RETRIES;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.TOPIC;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.VALUE_SERIALIZER;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
public final class DefaultKafkaTelemetryConfigTest {
@@ -55,11 +73,22 @@
private static final String VALUE_SERIALIZER_1 = "valueserializer1";
private static final String VALUE_SERIALIZER_2 = "valueserializer2";
+ private static final String KEY_1 = "key1";
+ private static final String KEY_2 = "key2";
+
+ private static final String TOPIC_1 = "topic1";
+ private static final String TOPIC_2 = "topic2";
+
+ private static final String CODEC_1 = "codec1";
+ private static final String CODEC_2 = "codec2";
+
private static final Map<String, Object> CONFIG_MAP_1 =
ImmutableMap.of("key1", "value1");
private static final Map<String, Object> CONFIG_MAP_2 =
ImmutableMap.of("key2", "value2");
+ private static final String DUMMY = "dummy";
+
private KafkaTelemetryConfig config1;
private KafkaTelemetryConfig sameAsConfig1;
private KafkaTelemetryConfig config2;
@@ -87,6 +116,9 @@
.withLingerMs(LINGER_MS_1)
.withKeySerializer(KEY_SERIALIZER_1)
.withValueSerializer(VALUE_SERIALIZER_1)
+ .withKey(KEY_1)
+ .withTopic(TOPIC_1)
+ .withCodec(CODEC_1)
.withConfigMap(CONFIG_MAP_1)
.build();
@@ -100,6 +132,9 @@
.withLingerMs(LINGER_MS_1)
.withKeySerializer(KEY_SERIALIZER_1)
.withValueSerializer(VALUE_SERIALIZER_1)
+ .withKey(KEY_1)
+ .withTopic(TOPIC_1)
+ .withCodec(CODEC_1)
.withConfigMap(CONFIG_MAP_1)
.build();
@@ -113,6 +148,9 @@
.withLingerMs(LINGER_MS_2)
.withKeySerializer(KEY_SERIALIZER_2)
.withValueSerializer(VALUE_SERIALIZER_2)
+ .withKey(KEY_2)
+ .withTopic(TOPIC_2)
+ .withCodec(CODEC_2)
.withConfigMap(CONFIG_MAP_2)
.build();
}
@@ -151,6 +189,46 @@
assertThat(config.lingerMs(), is(LINGER_MS_1));
assertThat(config.keySerializer(), is(KEY_SERIALIZER_1));
assertThat(config.valueSerializer(), is(VALUE_SERIALIZER_1));
+ assertThat(config.key(), is(KEY_1));
+ assertThat(config.topic(), is(TOPIC_1));
+ assertThat(config.codec(), is(CODEC_1));
assertThat(config.configMap(), is(CONFIG_MAP_1));
}
-}
+
+ /**
+ * Tests props extraction.
+ */
+ @Test
+ public void testPropsExtraction() {
+ Map<String, String> props = Maps.newConcurrentMap();
+ props.put(ADDRESS, IP_ADDRESS_1);
+ props.put(PORT, String.valueOf(PORT_1));
+ props.put(RETRIES, String.valueOf(RETRIES_1));
+ props.put(BATCH_SIZE, String.valueOf(BATCH_SIZE_1));
+ props.put(MEMORY_BUFFER, String.valueOf(MEMORY_BUFFER_1));
+ props.put(REQUIRED_ACKS, REQUIRED_ACKS_1);
+ props.put(LINGER_MS, String.valueOf(LINGER_MS_1));
+ props.put(KEY_SERIALIZER, KEY_SERIALIZER_1);
+ props.put(VALUE_SERIALIZER, VALUE_SERIALIZER_1);
+ props.put(KEY, KEY_1);
+ props.put(TOPIC, TOPIC_1);
+ props.put(CODEC, CODEC_1);
+
+ TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, KAFKA,
+ ImmutableList.of(), DUMMY, DUMMY, false, props);
+
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+ assertThat(kafkaConfig.address(), is(IP_ADDRESS_1));
+ assertThat(kafkaConfig.port(), is(PORT_1));
+ assertThat(kafkaConfig.retries(), is(RETRIES_1));
+ assertThat(kafkaConfig.batchSize(), is(BATCH_SIZE_1));
+ assertThat(kafkaConfig.memoryBuffer(), is(MEMORY_BUFFER_1));
+ assertThat(kafkaConfig.requiredAcks(), is(REQUIRED_ACKS_1));
+ assertThat(kafkaConfig.lingerMs(), is(LINGER_MS_1));
+ assertThat(kafkaConfig.keySerializer(), is(KEY_SERIALIZER_1));
+ assertThat(kafkaConfig.valueSerializer(), is(VALUE_SERIALIZER_1));
+ assertThat(kafkaConfig.key(), is(KEY_1));
+ assertThat(kafkaConfig.topic(), is(TOPIC_1));
+ assertThat(kafkaConfig.codec(), is(CODEC_1));
+ }
+}
\ No newline at end of file