Refactor: move telemetry config from componentCfg to configs.xml

1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes

Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/AbstractTelemetryConfigLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/AbstractTelemetryConfigLoader.java
new file mode 100644
index 0000000..486cea5
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/AbstractTelemetryConfigLoader.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.onosproject.openstacktelemetry.api.TelemetryConfigAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract bootstrapper for loading and registering telemetry configurations
+ * that are independent from the default telemetry configurations.
+ */
+public abstract class AbstractTelemetryConfigLoader {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private TelemetryConfigProvider provider;
+    private final String path;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigAdminService configAdminService;
+
+    /**
+     * Creates a new loader for resource with the specified path.
+     *
+     * @param path configurations definition XML resource path
+     */
+    protected AbstractTelemetryConfigLoader(String path) {
+        this.path = path;
+    }
+
+    @Activate
+    protected void activate() {
+        try {
+            provider = new XmlTelemetryConfigLoader().loadTelemetryConfigs(
+                                            getClass().getResourceAsStream(path));
+            configAdminService.registerProvider(provider);
+        } catch (Exception e) {
+            log.error("Unable to load {} telemetry configuration definitions", path, e);
+        }
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        configAdminService.unregisterProvider(provider);
+        log.info("Stopped");
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
index 2b2d7b6..9ee84f8 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
@@ -24,6 +24,9 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+/**
+ * Implementation of StatsFlowRule.
+ */
 public final class DefaultStatsFlowRule implements StatsFlowRule {
     private final IpPrefix srcIpPrefix;
     private final IpPrefix dstIpPrefix;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfig.java
new file mode 100644
index 0000000..368b36b
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfig.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.ImmutableMap.copyOf;
+
+/**
+ * Implementation of TelemetryConfig.
+ */
+public final class DefaultTelemetryConfig implements TelemetryConfig {
+    private final String name;
+    private final ConfigType type;
+    private final List<TelemetryConfig> parents;
+
+    private final String manufacturer;
+    private final String swVersion;
+    private final boolean enabled;
+
+    private final Map<String, String> properties;
+
+    /**
+     * Creates a configuration with the specified name.
+     *
+     * @param name          configuration name
+     * @param type          configuration type
+     * @param parents       optional parent configurations
+     * @param manufacturer  off-platform application manufacturer
+     * @param swVersion     off-platform application software version
+     * @param enabled       service enable flag
+     * @param properties    properties for telemetry configuration
+     */
+    public DefaultTelemetryConfig(String name, ConfigType type,
+                                  List<TelemetryConfig> parents,
+                                  String manufacturer, String swVersion,
+                                  boolean enabled, Map<String, String> properties) {
+        this.name = checkNotNull(name, "Name cannot be null");
+        this.type = checkNotNull(type, "type cannot be null");
+        this.parents = parents == null ? ImmutableList.of() : ImmutableList.copyOf(parents);
+        this.manufacturer = checkNotNull(manufacturer, "Manufacturer cannot be null");
+        this.swVersion = checkNotNull(swVersion, "SW version cannot be null");
+        this.properties = copyOf(checkNotNull(properties, "Properties cannot be null"));
+        this.enabled = enabled;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public ConfigType type() {
+        return type;
+    }
+
+    @Override
+    public List<TelemetryConfig> parents() {
+        if (parents == null) {
+            return ImmutableList.of();
+        } else {
+            return ImmutableList.copyOf(parents);
+        }
+    }
+
+    @Override
+    public String manufacturer() {
+        return manufacturer;
+    }
+
+    @Override
+    public String swVersion() {
+        return swVersion;
+    }
+
+    @Override
+    public boolean enabled() {
+        return enabled;
+    }
+
+    @Override
+    public Map<String, String> properties() {
+        if (properties == null) {
+            return ImmutableMap.of();
+        } else {
+            return ImmutableMap.copyOf(properties);
+        }
+    }
+
+    @Override
+    public String getProperty(String name) {
+        Queue<TelemetryConfig> queue = new LinkedList<>();
+        queue.add(this);
+        while (!queue.isEmpty()) {
+            TelemetryConfig config = queue.remove();
+            String property = config.properties().get(name);
+            if (property != null) {
+                return property;
+            } else if (config.parents() != null) {
+                queue.addAll(config.parents());
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public TelemetryConfig merge(TelemetryConfig other) {
+        // merge the properties
+        ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+        properties.putAll(other.properties());
+
+        // remove duplicated properties from this configuration and merge
+        this.properties().entrySet().stream()
+                .filter(e -> !other.properties().containsKey(e.getKey()))
+                .forEach(properties::put);
+
+        List<TelemetryConfig> completeParents = new ArrayList<>();
+
+        if (parents != null) {
+            parents.forEach(parent -> other.parents().forEach(otherParent -> {
+                if (otherParent.name().equals(parent.name())) {
+                    completeParents.add(parent.merge(otherParent));
+                } else if (!completeParents.contains(otherParent)) {
+                    completeParents.add(otherParent);
+                } else if (!completeParents.contains(parent)) {
+                    completeParents.add(parent);
+                }
+            }));
+        }
+
+        return new DefaultTelemetryConfig(name, type,
+                !completeParents.isEmpty() ? completeParents : other.parents(),
+                manufacturer, swVersion, enabled, properties.build());
+    }
+
+    @Override
+    public Set<String> keys() {
+        return properties.keySet();
+    }
+
+    @Override
+    public String value(String key) {
+        return properties.get(key);
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("name", name)
+                .add("type", type)
+                .add("parents", parents)
+                .add("manufacturer", manufacturer)
+                .add("swVersion", swVersion)
+                .add("enabled", enabled)
+                .add("properties", properties)
+                .toString();
+    }
+
+    @Override
+    public boolean equals(Object configToBeCompared) {
+        if (this == configToBeCompared) {
+            return true;
+        }
+
+        if (configToBeCompared == null || getClass() != configToBeCompared.getClass()) {
+            return false;
+        }
+
+        DefaultTelemetryConfig telemetryConfig =
+                                    (DefaultTelemetryConfig) configToBeCompared;
+        return name.equals(telemetryConfig.name());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(name);
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigProvider.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigProvider.java
new file mode 100644
index 0000000..5fb1e70
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Default telemetry configuration provider implementation.
+ */
+public class DefaultTelemetryConfigProvider implements TelemetryConfigProvider {
+
+    protected final Map<String, TelemetryConfig> configs = Maps.newConcurrentMap();
+
+    @Override
+    public Set<TelemetryConfig> getTelemetryConfigs() {
+        return ImmutableSet.copyOf(configs.values());
+    }
+
+    /**
+     * Adds the specified configuration to the provider. If a configuration with
+     * the name does not exist yet, the specified one will be added. Otherwise,
+     * the existing configuration will be merged with the new one and the result will
+     * be registered.
+     *
+     * @param config telemetry configuration to be provided
+     * @return registered configuration
+     */
+    public TelemetryConfig addConfig(TelemetryConfig config) {
+        return configs.compute(config.name(), (name, oldConfig) ->
+                oldConfig == null ? config : oldConfig.merge(config));
+    }
+
+    /**
+     * Removes the specified configuration from the provider.
+     *
+     * @param config telemetry configuration
+     */
+    public void removeConfig(TelemetryConfig config) {
+        configs.remove(config.name());
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this).add("configs", configs).toString();
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
new file mode 100644
index 0000000..b1d6133
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStore;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStoreDelegate;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_ADDED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_DELETED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_UPDATED;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages the inventory of telemetry configurations using a {@code ConsistentMap}.
+ */
+@Component(immediate = true, service = TelemetryConfigStore.class)
+public class DistributedTelemetryConfigStore
+        extends AbstractStore<TelemetryConfigEvent, TelemetryConfigStoreDelegate>
+        implements TelemetryConfigStore {
+
+    protected final Logger log = getLogger(getClass());
+
+    private static final String ERR_NOT_FOUND = " does not exist";
+    private static final String ERR_DUPLICATE = " already exists";
+
+    private static final KryoNamespace SERIALIZER_TELEMETRY_CONFIG =
+            KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.API)
+                    .register(TelemetryConfigProvider.class)
+                    .register(DefaultTelemetryConfigProvider.class)
+                    .register(TelemetryConfig.class)
+                    .register(TelemetryConfig.ConfigType.class)
+                    .register(DefaultTelemetryConfig.class)
+                    .build();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+    private final MapEventListener<String, TelemetryConfig>
+            telemetryConfigMapListener = new TelemetryConfigMapListener();
+
+    private ConsistentMap<String, TelemetryConfig> telemetryConfigStore;
+
+    @Activate
+    protected void activate() {
+        ApplicationId appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
+
+        telemetryConfigStore = storageService.<String, TelemetryConfig>consistentMapBuilder()
+                .withSerializer(Serializer.using(SERIALIZER_TELEMETRY_CONFIG))
+                .withName("telemetry-config-store")
+                .withApplicationId(appId)
+                .build();
+        telemetryConfigStore.addListener(telemetryConfigMapListener);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        telemetryConfigStore.removeListener(telemetryConfigMapListener);
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    @Override
+    public void createTelemetryConfig(TelemetryConfig config) {
+        telemetryConfigStore.compute(config.name(), (name, existing) -> {
+            final String error = config.name() + ERR_DUPLICATE;
+            checkArgument(existing == null, error);
+            return config;
+        });
+    }
+
+    @Override
+    public void updateTelemetryConfig(TelemetryConfig config) {
+        telemetryConfigStore.compute(config.name(), (name, existing) -> {
+            final String error = config.name() + ERR_NOT_FOUND;
+            checkArgument(existing != null, error);
+            return config;
+        });
+    }
+
+    @Override
+    public TelemetryConfig removeTelemetryConfig(String name) {
+        Versioned<TelemetryConfig> config = telemetryConfigStore.remove(name);
+        return config == null ? null : config.value();
+    }
+
+    @Override
+    public TelemetryConfig telemetryConfig(String name) {
+        return telemetryConfigStore.asJavaMap().get(name);
+    }
+
+    @Override
+    public Set<TelemetryConfig> telemetryConfigs() {
+        return ImmutableSet.copyOf(telemetryConfigStore.asJavaMap().values());
+    }
+
+    @Override
+    public Set<TelemetryConfig> telemetryConfigsByType(ConfigType type) {
+        return ImmutableSet.copyOf(telemetryConfigStore.asJavaMap().values()
+                .stream().filter(c -> c.type() == type).collect(Collectors.toSet()));
+    }
+
+    @Override
+    public void clear() {
+        telemetryConfigStore.clear();
+    }
+
+    private class TelemetryConfigMapListener
+                        implements MapEventListener<String, TelemetryConfig> {
+
+        @Override
+        public void event(MapEvent<String, TelemetryConfig> event) {
+            switch (event.type()) {
+                case INSERT:
+                    eventExecutor.execute(() -> processTelemetryConfigMapInsertion(event));
+                    break;
+                case UPDATE:
+                    eventExecutor.execute(() -> processTelemetryConfigMapUpdate(event));
+                    break;
+                case REMOVE:
+                    eventExecutor.execute(() -> processTelemetryConfigMapRemoval(event));
+                    break;
+                default:
+                    log.error("Unsupported telemetry config event type");
+                    break;
+            }
+        }
+
+        private void processTelemetryConfigMapInsertion(MapEvent<String,
+                                                        TelemetryConfig> event) {
+            log.debug("Telemetry config created");
+            notifyDelegate(new TelemetryConfigEvent(
+                    CONFIG_ADDED, event.newValue().value()));
+        }
+
+        private void processTelemetryConfigMapUpdate(MapEvent<String,
+                                                     TelemetryConfig> event) {
+            log.debug("Telemetry config updated");
+            notifyDelegate(new TelemetryConfigEvent(
+                    CONFIG_UPDATED, event.newValue().value()));
+        }
+
+        private void processTelemetryConfigMapRemoval(MapEvent<String,
+                                                      TelemetryConfig> event) {
+            log.debug("Telemetry config removed");
+            notifyDelegate(new TelemetryConfigEvent(
+                    CONFIG_DELETED, event.oldValue().value()));
+        }
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcConfigsLoader.java
new file mode 100644
index 0000000..f7e4693
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for gRPC telemetry configurations.
+ */
+@Component(immediate = true)
+public class GrpcConfigsLoader extends AbstractTelemetryConfigLoader {
+    public GrpcConfigsLoader() {
+        super("grpc-configs.xml");
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryConfigManager.java
deleted file mode 100644
index 70b1d70..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryConfigManager.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.GrpcTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_SERVER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_SERVER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_USE_PLAINTEXT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_MAX_INBOUND_MSG_SIZE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_SERVER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_SERVER_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_USE_PLAINTEXT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * gRPC server configuration manager for publishing openstack telemetry.
- */
-@Component(
-    immediate = true,
-    service = GrpcTelemetryConfigService.class,
-    property = {
-        PROP_GRPC_ENABLE_SERVICE + ":Boolean=" + GRPC_ENABLE_SERVICE_DEFAULT,
-        PROP_GRPC_SERVER_ADDRESS  + "=" + GRPC_SERVER_ADDRESS_DEFAULT,
-        PROP_GRPC_SERVER_PORT + ":Integer=" + GRPC_SERVER_PORT_DEFAULT,
-        PROP_GRPC_USE_PLAINTEXT + ":Boolean=" + GRPC_USE_PLAINTEXT_DEFAULT,
-        PROP_GRPC_MAX_INBOUND_MSG_SIZE + ":Integer=" + GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT
-    }
-)
-public class GrpcTelemetryConfigManager implements GrpcTelemetryConfigService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected GrpcTelemetryAdminService grpcTelemetryAdminService;
-
-    /** Default IP address to establish initial connection to gRPC server. */
-    protected String address = GRPC_SERVER_ADDRESS_DEFAULT;
-
-    /** Default port number to establish initial connection to gRPC server. */
-    protected Integer port = GRPC_SERVER_PORT_DEFAULT;
-
-    /** UsePlaintext flag value used for connecting to gRPC server. */
-    protected Boolean usePlaintext = GRPC_USE_PLAINTEXT_DEFAULT;
-
-    /** Maximum inbound message size used for communicating with gRPC server. */
-    protected Integer maxInboundMsgSize = GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT;
-
-    /** Specify the default behavior of telemetry service. */
-    protected Boolean enableService = GRPC_ENABLE_SERVICE_DEFAULT;
-
-    @Activate
-    protected void activate(ComponentContext context) {
-        componentConfigService.registerProperties(getClass());
-
-        if (enableService) {
-            grpcTelemetryAdminService.start(getConfig());
-        }
-        log.info("Started");
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        componentConfigService.unregisterProperties(getClass(), false);
-
-        if (enableService) {
-            grpcTelemetryAdminService.stop();
-        }
-        log.info("Stopped");
-    }
-
-    @Modified
-    private void modified(ComponentContext context) {
-        readComponentConfiguration(context);
-        initTelemetryService(grpcTelemetryAdminService, getConfig(), enableService);
-        log.info("Modified");
-    }
-
-    @Override
-    public TelemetryConfig getConfig() {
-        return new DefaultGrpcTelemetryConfig.DefaultBuilder()
-                .withAddress(address)
-                .withPort(port)
-                .withUsePlaintext(usePlaintext)
-                .withMaxInboundMsgSize(maxInboundMsgSize)
-                .build();
-    }
-
-    /**
-     * Extracts properties from the component configuration context.
-     *
-     * @param context the component context
-     */
-    private void readComponentConfiguration(ComponentContext context) {
-        Dictionary<?, ?> properties = context.getProperties();
-
-        String addressStr = get(properties, PROP_GRPC_SERVER_ADDRESS);
-        address = addressStr != null ? addressStr : GRPC_SERVER_ADDRESS_DEFAULT;
-        log.info("Configured. gRPC server address is {}", address);
-
-        Integer portConfigured = Tools.getIntegerProperty(properties, PROP_GRPC_SERVER_PORT);
-        if (portConfigured == null) {
-            port = GRPC_SERVER_PORT_DEFAULT;
-            log.info("gRPC server port is NOT configured, default value is {}", port);
-        } else {
-            port = portConfigured;
-            log.info("Configured. gRPC server port is {}", port);
-        }
-
-        Boolean usePlaintextConfigured =
-                getBooleanProperty(properties, PROP_GRPC_USE_PLAINTEXT);
-        if (usePlaintextConfigured == null) {
-            usePlaintext = GRPC_USE_PLAINTEXT_DEFAULT;
-            log.info("gRPC server use plaintext flag is NOT " +
-                    "configured, default value is {}", usePlaintext);
-        } else {
-            usePlaintext = usePlaintextConfigured;
-            log.info("Configured. gRPC server use plaintext flag is {}", usePlaintext);
-        }
-
-        Integer maxInboundMsgSizeConfigured =
-                getIntegerProperty(properties, PROP_GRPC_MAX_INBOUND_MSG_SIZE);
-        if (maxInboundMsgSizeConfigured == null) {
-            maxInboundMsgSize = GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT;
-            log.info("gRPC server max inbound message size is NOT " +
-                    "configured, default value is {}", maxInboundMsgSize);
-        } else {
-            maxInboundMsgSize = maxInboundMsgSizeConfigured;
-            log.info("Configured. gRPC server max inbound message size is {}", maxInboundMsgSize);
-        }
-
-        Boolean enableServiceConfigured =
-                getBooleanProperty(properties, PROP_GRPC_ENABLE_SERVICE);
-        if (enableServiceConfigured == null) {
-            enableService = GRPC_ENABLE_SERVICE_DEFAULT;
-            log.info("gRPC service enable flag is NOT " +
-                    "configured, default value is {}", enableService);
-        } else {
-            enableService = enableServiceConfigured;
-            log.info("Configured. gRPC service enable flag is {}", enableService);
-        }
-    }
-
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
index 391b1d8..f487504 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
@@ -15,12 +15,13 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.google.common.collect.Sets;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.GrpcTelemetryConfig;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -29,6 +30,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Set;
+
+import static org.onosproject.openstacktelemetry.api.Constants.GRPC_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
+import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.fromTelemetryConfig;
+
 /**
  * gRPC telemetry manager.
  */
@@ -40,7 +47,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
-    private ManagedChannel channel = null;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigService telemetryConfigService;
+
+    private Set<ManagedChannel> channels = Sets.newConcurrentHashSet();
 
     @Activate
     protected void activate() {
@@ -60,43 +70,41 @@
     }
 
     @Override
-    public void start(TelemetryConfig config) {
-        if (channel != null) {
-            log.info("gRPC producer has already been started");
-            return;
-        }
+    public void start() {
+        telemetryConfigService.getConfigsByType(GRPC).forEach(c -> {
+            GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(c);
 
-        GrpcTelemetryConfig grpcConfig = (GrpcTelemetryConfig) config;
-        channel = ManagedChannelBuilder
-                .forAddress(grpcConfig.address(), grpcConfig.port())
-                .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
-                .usePlaintext(grpcConfig.usePlaintext())
-                .build();
+            if (grpcConfig != null && !c.name().equals(GRPC_SCHEME) && c.enabled()) {
+                ManagedChannel channel = ManagedChannelBuilder
+                        .forAddress(grpcConfig.address(), grpcConfig.port())
+                        .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
+                        .usePlaintext(grpcConfig.usePlaintext())
+                        .build();
+
+                channels.add(channel);
+            }
+        });
 
         log.info("gRPC producer has Started");
     }
 
     @Override
     public void stop() {
-        if (channel != null) {
-            channel.shutdown();
-            channel = null;
-        }
-
+        channels.forEach(ManagedChannel::shutdown);
         log.info("gRPC producer has Stopped");
     }
 
     @Override
-    public void restart(TelemetryConfig config) {
+    public void restart() {
         stop();
-        start(config);
+        start();
     }
 
     @Override
     public Object publish(Object record) {
         // TODO: need to find a way to invoke gRPC endpoint using channel
 
-        if (channel == null) {
+        if (channels.isEmpty()) {
             log.debug("gRPC telemetry service has not been enabled!");
         }
 
@@ -105,6 +113,6 @@
 
     @Override
     public boolean isRunning() {
-        return channel != null;
+        return !channels.isEmpty();
     }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbConfigsLoader.java
new file mode 100644
index 0000000..215e1f6
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for InfluxDB telemetry configurations.
+ */
+@Component(immediate = true)
+public class InfluxDbConfigsLoader extends AbstractTelemetryConfigLoader {
+    public InfluxDbConfigsLoader() {
+        super("influxdb-configs.xml");
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
deleted file mode 100644
index d9107ca..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_DATABASE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_DATABASE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_BATCH;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_BATCH_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_MEASUREMENT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_MEASUREMENT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_PASSWORD;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_PASSWORD_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_USERNAME;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_USERNAME_DEFAULT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * InfluxDB server configuration manager for publishing openstack telemetry.
- */
-@Component(
-    immediate = true,
-    service = InfluxDbTelemetryConfigService.class,
-    property = {
-        PROP_INFLUXDB_ENABLE_SERVICE + ":Boolean=" + PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT,
-        PROP_INFLUXDB_SERVER_ADDRESS + "=" + PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT,
-        PROP_INFLUXDB_SERVER_PORT + ":Integer=" + PROP_INFLUXDB_SERVER_PORT_DEFAULT,
-        PROP_INFLUXDB_USERNAME + "=" + PROP_INFLUXDB_USERNAME_DEFAULT,
-        PROP_INFLUXDB_PASSWORD + "=" + PROP_INFLUXDB_PASSWORD_DEFAULT,
-        PROP_INFLUXDB_DATABASE + "=" + PROP_INFLUXDB_DATABASE_DEFAULT,
-        PROP_INFLUXDB_MEASUREMENT + "=" + PROP_INFLUXDB_MEASUREMENT_DEFAULT,
-        PROP_INFLUXDB_ENABLE_BATCH + ":Boolean=" + PROP_INFLUXDB_ENABLE_BATCH_DEFAULT
-    }
-)
-public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected InfluxDbTelemetryAdminService influxDbTelemetryAdminService;
-
-    /** Default IP address to establish initial connection to InfluxDB server. */
-    protected String address = PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT;
-
-    /** Default port number to establish initial connection to InfluxDB server. */
-    protected Integer port = PROP_INFLUXDB_SERVER_PORT_DEFAULT;
-
-    /** Username used for authenticating against InfluxDB server. */
-    protected String username = PROP_INFLUXDB_USERNAME_DEFAULT;
-
-    /** Password used for authenticating against InfluxDB server. */
-    protected String password = PROP_INFLUXDB_PASSWORD_DEFAULT;
-
-    /** Database of InfluxDB server. */
-    protected String database = PROP_INFLUXDB_DATABASE_DEFAULT;
-
-    /** Measurement of InfluxDB server. */
-    protected String measurement = PROP_INFLUXDB_MEASUREMENT_DEFAULT;
-
-    /** Flag value of enabling batch mode of InfluxDB server. */
-    protected Boolean enableBatch = PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-
-    /** Specify the default behavior of telemetry service. */
-    protected Boolean enableService = PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-
-    @Activate
-    protected void activate(ComponentContext context) {
-        componentConfigService.registerProperties(getClass());
-
-        if (enableService) {
-            influxDbTelemetryAdminService.start(getConfig());
-        }
-        log.info("Started");
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        componentConfigService.unregisterProperties(getClass(), false);
-
-        if (enableService) {
-            influxDbTelemetryAdminService.stop();
-        }
-        log.info("Stopped");
-    }
-
-    @Modified
-    private void modified(ComponentContext context) {
-        readComponentConfiguration(context);
-        initTelemetryService(influxDbTelemetryAdminService, getConfig(), enableService);
-        log.info("Modified");
-    }
-
-    @Override
-    public TelemetryConfig getConfig() {
-        return new DefaultInfluxDbTelemetryConfig.DefaultBuilder()
-                .withAddress(address)
-                .withPort(port)
-                .withUsername(username)
-                .withPassword(password)
-                .withDatabase(database)
-                .withMeasurement(measurement)
-                .withEnableBatch(enableBatch)
-                .build();
-    }
-
-    /**
-     * Extracts properties from the component configuration context.
-     *
-     * @param context the component context
-     */
-    private void readComponentConfiguration(ComponentContext context) {
-        Dictionary<?, ?> properties = context.getProperties();
-
-        String addressStr = Tools.get(properties, PROP_INFLUXDB_SERVER_ADDRESS);
-        address = addressStr != null ? addressStr : PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT;
-        log.info("Configured. InfluxDB server address is {}", address);
-
-        Integer portConfigured = Tools.getIntegerProperty(properties, PROP_INFLUXDB_SERVER_PORT);
-        if (portConfigured == null) {
-            port = PROP_INFLUXDB_SERVER_PORT_DEFAULT;
-            log.info("InfluxDB server port is NOT configured, default value is {}", port);
-        } else {
-            port = portConfigured;
-            log.info("Configured. InfluxDB server port is {}", port);
-        }
-
-        String usernameStr = Tools.get(properties, PROP_INFLUXDB_USERNAME);
-        username = usernameStr != null ? usernameStr : PROP_INFLUXDB_USERNAME_DEFAULT;
-        log.info("Configured. InfluxDB server username is {}", username);
-
-        String passwordStr = Tools.get(properties, PROP_INFLUXDB_PASSWORD);
-        password = passwordStr != null ? passwordStr : PROP_INFLUXDB_PASSWORD_DEFAULT;
-        log.info("Configured. InfluxDB server password is {}", password);
-
-        String databaseStr = Tools.get(properties, PROP_INFLUXDB_DATABASE);
-        database = databaseStr != null ? databaseStr : PROP_INFLUXDB_DATABASE_DEFAULT;
-        log.info("Configured. InfluxDB server database is {}", database);
-
-        String measurementStr = Tools.get(properties, PROP_INFLUXDB_MEASUREMENT);
-        measurement = measurementStr != null ? measurementStr : PROP_INFLUXDB_MEASUREMENT_DEFAULT;
-        log.info("Configured. InfluxDB server measurement is {}", measurement);
-
-        Boolean enableBatchConfigured = getBooleanProperty(properties, PROP_INFLUXDB_ENABLE_BATCH);
-        if (enableBatchConfigured == null) {
-            enableBatch = PROP_INFLUXDB_ENABLE_BATCH_DEFAULT;
-            log.info("InfluxDB server enable batch flag is " +
-                    "NOT configured, default value is {}", enableBatch);
-        } else {
-            enableBatch = enableBatchConfigured;
-            log.info("Configured. InfluxDB server enable batch is {}", enableBatch);
-        }
-
-        Boolean enableServiceConfigured =
-                getBooleanProperty(properties, PROP_INFLUXDB_ENABLE_SERVICE);
-        if (enableServiceConfigured == null) {
-            enableService = PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-            log.info("InfluxDB service enable flag is NOT " +
-                    "configured, default value is {}", enableService);
-        } else {
-            enableService = enableServiceConfigured;
-            log.info("Configured. InfluxDB service enable flag is {}", enableService);
-        }
-    }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 40528d4..796da19 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -24,6 +24,7 @@
 import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
 import org.onosproject.openstacktelemetry.api.InfluxRecord;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
 import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
@@ -34,8 +35,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.Set;
 
+import static org.onosproject.openstacktelemetry.api.Constants.INFLUXDB_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.fromTelemetryConfig;
+
 /**
  * InfluxDB telemetry manager.
  */
@@ -70,10 +76,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigService telemetryConfigService;
+
     private static final String INFLUX_PROTOCOL = "http";
-    private InfluxDB producer = null;
-    private String database = null;
-    private String measurement = null;
+    private Map<String, InfluxDB> producers = null;
 
     @Activate
     protected void activate() {
@@ -93,51 +100,49 @@
     }
 
     @Override
-    public void start(TelemetryConfig config) {
-        if (producer != null) {
-            log.info("InfluxDB producer has already been started");
-            return;
-        }
+    public void start() {
 
-        InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
+        telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> {
+            InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(c);
 
-        StringBuilder influxDbServerBuilder = new StringBuilder();
-        influxDbServerBuilder.append(INFLUX_PROTOCOL);
-        influxDbServerBuilder.append(":");
-        influxDbServerBuilder.append("//");
-        influxDbServerBuilder.append(influxDbConfig.address());
-        influxDbServerBuilder.append(":");
-        influxDbServerBuilder.append(influxDbConfig.port());
+            if (influxDbConfig != null && !c.name().equals(INFLUXDB_SCHEME) && c.enabled()) {
+                StringBuilder influxDbServerBuilder = new StringBuilder();
+                influxDbServerBuilder.append(INFLUX_PROTOCOL);
+                influxDbServerBuilder.append(":");
+                influxDbServerBuilder.append("//");
+                influxDbServerBuilder.append(influxDbConfig.address());
+                influxDbServerBuilder.append(":");
+                influxDbServerBuilder.append(influxDbConfig.port());
 
-        producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
-                influxDbConfig.username(), influxDbConfig.password());
-        database = influxDbConfig.database();
-        measurement = influxDbConfig.measurement();
+                InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
+                        influxDbConfig.username(), influxDbConfig.password());
+                producers.put(c.name(), producer);
+
+                createDB(producer, influxDbConfig.database());
+            }
+        });
 
         log.info("InfluxDB producer has Started");
-
-        createDB();
     }
 
     @Override
     public void stop() {
-        if (producer != null) {
-            producer.close();
-            producer = null;
+        if (producers != null) {
+            producers.values().forEach(InfluxDB::close);
         }
 
         log.info("InfluxDB producer has stopped");
     }
 
     @Override
-    public void restart(TelemetryConfig config) {
+    public void restart() {
         stop();
-        start(config);
+        start();
     }
 
     @Override
     public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
-        if (producer == null) {
+        if (producers == null || producers.isEmpty()) {
             log.debug("InfluxDB telemetry service has not been enabled!");
             return;
         }
@@ -149,53 +154,61 @@
 
         log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
 
-        BatchPoints batchPoints = BatchPoints.database(database).build();
+        producers.forEach((k, v) -> {
+            TelemetryConfig config = telemetryConfigService.getConfig(k);
+            InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
 
-        for (FlowInfo flowInfo: record.flowInfos()) {
-            Point.Builder pointBuilder = Point
-                    .measurement((measurement == null) ? record.measurement() : measurement)
-                    .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
-                    .tag(DEVICE_ID, flowInfo.deviceId().toString())
-                    .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
-                    .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
-                    .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
-                    .tag(SRC_IP, flowInfo.srcIp().toString())
-                    .tag(DST_IP, flowInfo.dstIp().toString())
-                    .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
-                    .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
-                    .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
-                    .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
-                    .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
-                    .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
-                    .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
-                    .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
-                    .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
-                    .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
-                    .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+            String database = influxDbConfig.database();
+            String measurement = influxDbConfig.measurement();
 
-            if (flowInfo.vlanId() != null) {
-                pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+            BatchPoints batchPoints = BatchPoints.database(database).build();
+
+            for (FlowInfo flowInfo: record.flowInfos()) {
+                Point.Builder pointBuilder = Point
+                        .measurement((measurement == null) ? record.measurement() : measurement)
+                        .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
+                        .tag(DEVICE_ID, flowInfo.deviceId().toString())
+                        .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
+                        .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
+                        .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
+                        .tag(SRC_IP, flowInfo.srcIp().toString())
+                        .tag(DST_IP, flowInfo.dstIp().toString())
+                        .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
+                        .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
+                        .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
+                        .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
+                        .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
+                        .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
+                        .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
+                        .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
+                        .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
+                        .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
+                        .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+
+                if (flowInfo.vlanId() != null) {
+                    pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+                }
+
+                if (flowInfo.srcPort() != null) {
+                    pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
+                }
+
+                if (flowInfo.dstPort() != null) {
+                    pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
+                }
+
+                batchPoints.point(pointBuilder.build());
             }
-
-            if (flowInfo.srcPort() != null) {
-                pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
-            }
-
-            if (flowInfo.dstPort() != null) {
-                pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
-            }
-
-            batchPoints.point(pointBuilder.build());
-        }
-        producer.write(batchPoints);
+            v.write(batchPoints);
+        });
     }
 
     @Override
     public boolean isRunning() {
-        return producer != null;
+        return !producers.isEmpty();
     }
 
-    private void createDB() {
+    private void createDB(InfluxDB producer, String database) {
         if (producer.databaseExists(database)) {
             log.debug("Database {} is already created", database);
         } else {
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaConfigsLoader.java
new file mode 100644
index 0000000..1eca176
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for kafka telemetry configurations.
+ */
+@Component(immediate = true)
+public class KafkaConfigsLoader extends AbstractTelemetryConfigLoader {
+    public KafkaConfigsLoader() {
+        super("kafka-configs.xml");
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryConfigManager.java
deleted file mode 100644
index 6621e0b..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryConfigManager.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.KafkaTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_BATCH_SIZE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_BATCH_SIZE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_KEY_SERIALIZER;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_LINGER_MS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_LINGER_MS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_MEMORY_BUFFER;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_REQUIRED_ACKS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_RETRIES;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_RETRIES_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_VALUE_SERIALIZER;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * Kafka server configuration manager for publishing openstack telemetry.
- */
-@Component(
-    immediate = true,
-    service = KafkaTelemetryConfigService.class,
-    property = {
-        PROP_KAFKA_ADDRESS + "=" + PROP_KAFKA_ADDRESS_DEFAULT,
-        PROP_KAFKA_PORT + ":Integer=" + PROP_KAFKA_PORT_DEFAULT,
-        PROP_KAFKA_RETRIES + ":Integer=" + PROP_KAFKA_RETRIES_DEFAULT,
-        PROP_KAFKA_REQUIRED_ACKS + "=" + PROP_KAFKA_REQUIRED_ACKS_DEFAULT,
-        PROP_KAFKA_BATCH_SIZE + ":Integer=" + PROP_KAFKA_BATCH_SIZE_DEFAULT,
-        PROP_KAFKA_LINGER_MS + ":Integer=" + PROP_KAFKA_LINGER_MS_DEFAULT,
-        PROP_KAFKA_MEMORY_BUFFER + ":Integer=" + PROP_KAFKA_MEMORY_BUFFER_DEFAULT,
-        PROP_KAFKA_KEY_SERIALIZER + "=" + PROP_KAFKA_KEY_SERIALIZER_DEFAULT,
-        PROP_KAFKA_VALUE_SERIALIZER + "=" + PROP_KAFKA_VALUE_SERIALIZER_DEFAULT,
-        PROP_KAFKA_ENABLE_SERVICE + ":Boolean=" + PROP_KAFKA_ENABLE_SERVICE_DEFAULT
-    }
-)
-public class KafkaTelemetryConfigManager implements KafkaTelemetryConfigService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected KafkaTelemetryAdminService kafkaTelemetryAdminService;
-
-    /** Default IP address to establish initial connection to Kafka server. */
-    protected String address = PROP_KAFKA_ADDRESS_DEFAULT;
-
-    /** Default port number to establish initial connection to Kafka server. */
-    protected Integer port = PROP_KAFKA_PORT_DEFAULT;
-
-    /** Number of times the producer can retry to send after first failure. */
-    protected int retries = PROP_KAFKA_RETRIES_DEFAULT;
-
-    /** Producer will get an acknowledgement after the leader has replicated the data. */
-    protected String requiredAcks = PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
-
-    /** The largest record batch size allowed by Kafka. */
-    protected Integer batchSize = PROP_KAFKA_BATCH_SIZE_DEFAULT;
-
-    /** The producer groups together any records that arrive between request transmissions into a single batch. */
-    protected Integer lingerMs = PROP_KAFKA_LINGER_MS_DEFAULT;
-
-    /** The total memory used for log cleaner I/O buffers across all cleaner threads. */
-    protected Integer memoryBuffer = PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
-
-    /** Serializer class for key that implements the Serializer interface. */
-    protected String keySerializer = PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
-
-    /** Serializer class for value that implements the Serializer interface. */
-    protected String valueSerializer = PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
-
-    /** Specify the default behavior of telemetry service. */
-    protected Boolean enableService = PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
-
-    @Activate
-    protected void activate(ComponentContext context) {
-        componentConfigService.registerProperties(getClass());
-
-        if (enableService) {
-            kafkaTelemetryAdminService.start(getConfig());
-        }
-        log.info("Started");
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        componentConfigService.unregisterProperties(getClass(), false);
-
-        if (enableService) {
-            kafkaTelemetryAdminService.stop();
-        }
-        log.info("Stopped");
-    }
-
-    @Modified
-    private void modified(ComponentContext context) {
-        readComponentConfiguration(context);
-        initTelemetryService(kafkaTelemetryAdminService, getConfig(), enableService);
-        log.info("Modified");
-    }
-
-    @Override
-    public TelemetryConfig getConfig() {
-        return new DefaultKafkaTelemetryConfig.DefaultBuilder()
-                .withAddress(address)
-                .withPort(port)
-                .withRetries(retries)
-                .withRequiredAcks(requiredAcks)
-                .withBatchSize(batchSize)
-                .withLingerMs(lingerMs)
-                .withMemoryBuffer(memoryBuffer)
-                .withKeySerializer(keySerializer)
-                .withValueSerializer(valueSerializer)
-                .build();
-    }
-
-    /**
-     * Extracts properties from the component configuration context.
-     *
-     * @param context the component context
-     */
-    private void readComponentConfiguration(ComponentContext context) {
-        Dictionary<?, ?> properties = context.getProperties();
-
-        String addressStr = Tools.get(properties, PROP_KAFKA_ADDRESS);
-        address = addressStr != null ? addressStr : PROP_KAFKA_ADDRESS_DEFAULT;
-        log.info("Configured. Kafka server address is {}", address);
-
-        Integer portConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_PORT);
-        if (portConfigured == null) {
-            port = PROP_KAFKA_PORT_DEFAULT;
-            log.info("Kafka server port is NOT configured, default value is {}", port);
-        } else {
-            port = portConfigured;
-            log.info("Configured. Kafka server port is {}", port);
-        }
-
-        Integer retriesConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_RETRIES);
-        if (retriesConfigured == null) {
-            retries = PROP_KAFKA_RETRIES_DEFAULT;
-            log.info("Kafka number of retries property is NOT configured, default value is {}", retries);
-        } else {
-            retries = retriesConfigured;
-            log.info("Configured. Kafka number of retries is {}", retries);
-        }
-
-        String requiredAcksStr = Tools.get(properties, PROP_KAFKA_REQUIRED_ACKS);
-        requiredAcks = requiredAcksStr != null ? requiredAcksStr : PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
-        log.info("Configured, Kafka required acknowledgement is {}", requiredAcks);
-
-        Integer batchSizeConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_BATCH_SIZE);
-        if (batchSizeConfigured == null) {
-            batchSize = PROP_KAFKA_BATCH_SIZE_DEFAULT;
-            log.info("Kafka batch size property is NOT configured, default value is {}", batchSize);
-        } else {
-            batchSize = batchSizeConfigured;
-            log.info("Configured. Kafka batch size is {}", batchSize);
-        }
-
-        Integer lingerMsConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_LINGER_MS);
-        if (lingerMsConfigured == null) {
-            lingerMs = PROP_KAFKA_LINGER_MS_DEFAULT;
-            log.info("Kafka lingerMs property is NOT configured, default value is {}", lingerMs);
-        } else {
-            lingerMs = lingerMsConfigured;
-            log.info("Configured. Kafka lingerMs is {}", lingerMs);
-        }
-
-        Integer memoryBufferConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_MEMORY_BUFFER);
-        if (memoryBufferConfigured == null) {
-            memoryBuffer = PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
-            log.info("Kafka memory buffer property is NOT configured, default value is {}", memoryBuffer);
-        } else {
-            memoryBuffer = memoryBufferConfigured;
-            log.info("Configured. Kafka memory buffer is {}", memoryBuffer);
-        }
-
-        String keySerializerStr = Tools.get(properties, PROP_KAFKA_KEY_SERIALIZER);
-        keySerializer = keySerializerStr != null ? keySerializerStr : PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
-        log.info("Configured, Kafka key serializer is {}", keySerializer);
-
-        String valueSerializerStr = Tools.get(properties, PROP_KAFKA_VALUE_SERIALIZER);
-        valueSerializer = valueSerializerStr != null ? valueSerializerStr : PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
-        log.info("Configured, Kafka value serializer is {}", valueSerializer);
-
-        Boolean enableServiceConfigured =
-                getBooleanProperty(properties, PROP_KAFKA_ENABLE_SERVICE);
-        if (enableServiceConfigured == null) {
-            enableService = PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
-            log.info("Kafka service enable flag is NOT " +
-                    "configured, default value is {}", enableService);
-        } else {
-            enableService = enableServiceConfigured;
-            log.info("Configured. Kafka service enable flag is {}", enableService);
-        }
-    }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index e895e46..85f36e3 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -15,12 +15,17 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryCodec;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
 import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
@@ -31,9 +36,16 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.Future;
 
+import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
+
 /**
  * Kafka telemetry manager.
  */
@@ -42,6 +54,8 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String CODEC_PREFIX = "org.onosproject.openstacktelemetry.codec.";
+
     private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
     private static final String RETRIES = "retries";
     private static final String ACKS = "acks";
@@ -54,7 +68,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
-    private Producer<String, byte[]> producer = null;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigService telemetryConfigService;
+
+    private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
 
     @Activate
     protected void activate() {
@@ -74,64 +91,83 @@
     }
 
     @Override
-    public void start(TelemetryConfig config) {
-        if (producer != null) {
-            log.info("Kafka producer has already been started");
-            return;
-        }
+    public void start() {
+        telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
+            KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
 
-        KafkaTelemetryConfig kafkaConfig = (KafkaTelemetryConfig) config;
+            if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
+                StringBuilder kafkaServerBuilder = new StringBuilder();
+                kafkaServerBuilder.append(kafkaConfig.address());
+                kafkaServerBuilder.append(":");
+                kafkaServerBuilder.append(kafkaConfig.port());
 
-        StringBuilder kafkaServerBuilder = new StringBuilder();
-        kafkaServerBuilder.append(kafkaConfig.address());
-        kafkaServerBuilder.append(":");
-        kafkaServerBuilder.append(kafkaConfig.port());
+                // Configure Kafka server properties
+                Properties prop = new Properties();
+                prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
+                prop.put(RETRIES, kafkaConfig.retries());
+                prop.put(ACKS, kafkaConfig.requiredAcks());
+                prop.put(BATCH_SIZE, kafkaConfig.batchSize());
+                prop.put(LINGER_MS, kafkaConfig.lingerMs());
+                prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
+                prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
+                prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
 
-        // Configure Kafka server properties
-        Properties prop = new Properties();
-        prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
-        prop.put(RETRIES, kafkaConfig.retries());
-        prop.put(ACKS, kafkaConfig.requiredAcks());
-        prop.put(BATCH_SIZE, kafkaConfig.batchSize());
-        prop.put(LINGER_MS, kafkaConfig.lingerMs());
-        prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
-        prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
-        prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
+                producers.put(c.name(), new KafkaProducer<>(prop));
+            }
+        });
 
-        producer = new KafkaProducer<>(prop);
         log.info("Kafka producer has Started");
     }
 
     @Override
     public void stop() {
-        if (producer != null) {
-            producer.close();
-            producer = null;
+        if (!producers.isEmpty()) {
+            producers.values().forEach(Producer::close);
         }
 
+        producers.clear();
+
         log.info("Kafka producer has Stopped");
     }
 
     @Override
-    public void restart(TelemetryConfig config) {
+    public void restart() {
         stop();
-        start(config);
+        start();
     }
 
     @Override
-    public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
+    public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
 
-        if (producer == null) {
+        if (producers == null || producers.isEmpty()) {
             log.debug("Kafka telemetry service has not been enabled!");
             return null;
         }
 
         log.debug("Send telemetry record to kafka server...");
-        return producer.send(record);
+        Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
+        producers.forEach((k, v) -> {
+            TelemetryConfig config = telemetryConfigService.getConfig(k);
+            KafkaTelemetryConfig kafkaConfig =
+                    fromTelemetryConfig(config);
+
+            try {
+                Class codecClazz = Class.forName(CODEC_PREFIX + kafkaConfig.codec());
+                TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+
+                ByteBuffer buffer = codec.encode(flowInfos);
+                ProducerRecord record = new ProducerRecord<>(
+                        kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+                futureSet.add(v.send(record));
+            } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+                log.warn("Failed to send telemetry record due to {}", e);
+            }
+        });
+        return futureSet;
     }
 
     @Override
     public boolean isRunning() {
-        return producer != null;
+        return !producers.isEmpty();
     }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index 783dbd6..ce325cd 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -17,8 +17,6 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.GrpcTelemetryService;
 import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryService;
@@ -26,8 +24,10 @@
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.PrometheusTelemetryService;
 import org.onosproject.openstacktelemetry.api.RestTelemetryService;
-import org.onosproject.openstacktelemetry.api.TelemetryService;
-import org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec;
+import org.onosproject.openstacktelemetry.api.TelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigListener;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -36,14 +36,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 
 import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
-import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY;
-import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getPropertyValueAsBoolean;
 
 /**
  * Openstack telemetry manager.
@@ -53,63 +49,57 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final String ENABLE_SERVICE = "enableService";
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
+    protected TelemetryConfigService telemetryConfigService;
 
-    private List<TelemetryService> telemetryServices = Lists.newArrayList();
+    private List<TelemetryAdminService> telemetryServices = Lists.newArrayList();
+    private InternalTelemetryConfigListener
+                        configListener = new InternalTelemetryConfigListener();
 
     @Activate
     protected void activate() {
+        telemetryConfigService.addListener(configListener);
+
         log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
+        telemetryConfigService.removeListener(configListener);
+
         log.info("Stopped");
     }
 
     @Override
-    public void addTelemetryService(TelemetryService telemetryService) {
+    public void addTelemetryService(TelemetryAdminService telemetryService) {
         telemetryServices.add(telemetryService);
     }
 
     @Override
-    public void removeTelemetryService(TelemetryService telemetryService) {
+    public void removeTelemetryService(TelemetryAdminService telemetryService) {
         telemetryServices.remove(telemetryService);
     }
 
     @Override
     public void publish(Set<FlowInfo> flowInfos) {
         telemetryServices.forEach(service -> {
-            if (service instanceof GrpcTelemetryManager &&
-                    getPropertyValueAsBoolean(componentConfigService.getProperties(
-                            GrpcTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+            if (service instanceof GrpcTelemetryManager) {
                 invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
             }
 
-            if (service instanceof InfluxDbTelemetryManager &&
-                    getPropertyValueAsBoolean(componentConfigService.getProperties(
-                            InfluxDbTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+            if (service instanceof InfluxDbTelemetryManager) {
                 invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfos);
             }
 
-            if (service instanceof PrometheusTelemetryManager &&
-                    getPropertyValueAsBoolean(componentConfigService.getProperties(
-                            PrometheusTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+            if (service instanceof PrometheusTelemetryManager) {
                 invokePrometheusPublisher((PrometheusTelemetryService) service, flowInfos);
             }
 
-            if (service instanceof KafkaTelemetryManager &&
-                    getPropertyValueAsBoolean(componentConfigService.getProperties(
-                            KafkaTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+            if (service instanceof KafkaTelemetryManager) {
                 invokeKafkaPublisher((KafkaTelemetryService) service, flowInfos);
             }
 
-            if (service instanceof RestTelemetryManager &&
-                    getPropertyValueAsBoolean(componentConfigService.getProperties(
-                            RestTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+            if (service instanceof RestTelemetryManager) {
                 invokeRestPublisher((RestTelemetryService) service, flowInfos);
             }
 
@@ -118,31 +108,55 @@
     }
 
     @Override
-    public Set<TelemetryService> telemetryServices() {
+    public Set<TelemetryAdminService> telemetryServices() {
         return ImmutableSet.copyOf(telemetryServices);
     }
 
-    private void invokeGrpcPublisher(GrpcTelemetryService service, Set<FlowInfo> flowInfos) {
+    @Override
+    public TelemetryAdminService telemetryService(String type) {
+        return telemetryServices.stream()
+                .filter(s -> s.type().name().equalsIgnoreCase(type))
+                .findFirst()
+                .orElse(null);
+    }
+
+    private void invokeGrpcPublisher(GrpcTelemetryService service,
+                                     Set<FlowInfo> flowInfos) {
         // TODO: need provide implementation
     }
 
-    private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
+    private void invokeInfluxDbPublisher(InfluxDbTelemetryService service,
+                                         Set<FlowInfo> flowInfos) {
         DefaultInfluxRecord<String, Set<FlowInfo>> influxRecord
                 = new DefaultInfluxRecord<>(DEFAULT_INFLUXDB_MEASUREMENT, flowInfos);
         service.publish(influxRecord);
     }
 
-    private void invokePrometheusPublisher(PrometheusTelemetryService service, Set<FlowInfo> flowInfos) {
+    private void invokePrometheusPublisher(PrometheusTelemetryService service,
+                                           Set<FlowInfo> flowInfos) {
         service.publish(flowInfos);
     }
 
-    private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
-        TinaMessageByteBufferCodec codec = new TinaMessageByteBufferCodec();
-        ByteBuffer buffer = codec.encode(flowInfos);
-        service.publish(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY, buffer.array()));
+    private void invokeKafkaPublisher(KafkaTelemetryService service,
+                                      Set<FlowInfo> flowInfos) {
+        service.publish(flowInfos);
     }
 
-    private void invokeRestPublisher(RestTelemetryService service, Set<FlowInfo> flowInfos) {
+    private void invokeRestPublisher(RestTelemetryService service,
+                                     Set<FlowInfo> flowInfos) {
         // TODO: need provide implementation
     }
+
+    private class InternalTelemetryConfigListener implements TelemetryConfigListener {
+
+        @Override
+        public void event(TelemetryConfigEvent event) {
+            TelemetryAdminService service =
+                    telemetryService(event.subject().type().name());
+
+            if (service != null) {
+                service.restart();
+            }
+        }
+    }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java
index 325bce2..cc754f5 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java
@@ -23,65 +23,7 @@
     private OsgiPropertyConstants() {
     }
 
-    // REST telemetry
-
-    static final String PROP_REST_ENABLE_SERVICE = "enableService";
-    static final boolean PROP_REST_ENABLE_SERVICE_DEFAULT = false;
-
-    static final String PROP_REST_SERVER_ADDRESS = "address";
-    static final String PROP_REST_SERVER_ADDRESS_DEFAULT = "localhost";
-
-    static final String PROP_REST_SERVER_PORT = "port";
-    static final int PROP_REST_SERVER_PORT_DEFAULT = 80;
-
-    static final String PROP_REST_ENDPOINT = "endpoint";
-    static final String PROP_REST_ENDPOINT_DEFAULT = "telemetry";
-
-    static final String PROP_REST_METHOD = "method";
-    static final String PROP_REST_METHOD_DEFAULT = "POST";
-
-    static final String PROP_REST_REQUEST_MEDIA_TYPE = "requestMediaType";
-    static final String PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT = "application/json";
-
-    static final String PROP_REST_RESPONSE_MEDIA_TYPE = "responseMediaType";
-    static final String PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT = "application/json";
-
-    // Kafka telemetry
-
-    static final String PROP_KAFKA_ENABLE_SERVICE = "enableService";
-    static final boolean PROP_KAFKA_ENABLE_SERVICE_DEFAULT = false;
-
-    static final String PROP_KAFKA_ADDRESS = "address";
-    static final String PROP_KAFKA_ADDRESS_DEFAULT = "localhost";
-
-    static final String PROP_KAFKA_PORT = "port";
-    static final int PROP_KAFKA_PORT_DEFAULT = 9092;
-
-    static final String PROP_KAFKA_RETRIES = "retries";
-    static final int PROP_KAFKA_RETRIES_DEFAULT = 0;
-
-    static final String PROP_KAFKA_REQUIRED_ACKS = "requiredAcks";
-    static final String PROP_KAFKA_REQUIRED_ACKS_DEFAULT = "all";
-
-    static final String PROP_KAFKA_BATCH_SIZE = "batchSize";
-    static final int PROP_KAFKA_BATCH_SIZE_DEFAULT = 16384;
-
-    static final String PROP_KAFKA_LINGER_MS = "lingerMs";
-    static final int PROP_KAFKA_LINGER_MS_DEFAULT = 1;
-
-    static final String PROP_KAFKA_MEMORY_BUFFER = "memoryBuffer";
-    static final int PROP_KAFKA_MEMORY_BUFFER_DEFAULT = 33554432;
-
-    static final String PROP_KAFKA_KEY_SERIALIZER = "keySerializer";
-    static final String PROP_KAFKA_KEY_SERIALIZER_DEFAULT =
-        "org.apache.kafka.common.serialization.StringSerializer";
-
-    static final String PROP_KAFKA_VALUE_SERIALIZER = "valueSerializer";
-    static final String PROP_KAFKA_VALUE_SERIALIZER_DEFAULT =
-        "org.apache.kafka.common.serialization.ByteArraySerializer";
-
     // Stats flow rule manager
-
     static final String PROP_REVERSE_PATH_STATS = "reversePathStats";
     static final boolean PROP_REVERSE_PATH_STATS_DEFAULT = false;
 
@@ -96,56 +38,4 @@
 
     static final String PROP_MONITOR_UNDERLAY = "monitorUnderlay";
     static final boolean PROP_MONITOR_UNDERLAY_DEFAULT = true;
-
-    // Influx DB Telemetry config manager
-
-    static final String PROP_INFLUXDB_ENABLE_SERVICE = "enableService";
-    static final boolean PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT = false;
-
-    static final String PROP_INFLUXDB_SERVER_ADDRESS = "address";
-    static final String PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT = "localhost";
-
-    static final String PROP_INFLUXDB_SERVER_PORT = "port";
-    static final int PROP_INFLUXDB_SERVER_PORT_DEFAULT = 8086;
-
-    static final String PROP_INFLUXDB_USERNAME = "username";
-    static final String PROP_INFLUXDB_USERNAME_DEFAULT = "onos";
-
-    static final String PROP_INFLUXDB_PASSWORD = "password";
-    static final String PROP_INFLUXDB_PASSWORD_DEFAULT = "onos";
-
-    static final String PROP_INFLUXDB_DATABASE = "database";
-    static final String PROP_INFLUXDB_DATABASE_DEFAULT = "onos";
-
-    static final String PROP_INFLUXDB_MEASUREMENT = "measurement";
-    static final String PROP_INFLUXDB_MEASUREMENT_DEFAULT = "sonaflow";
-
-    static final String PROP_INFLUXDB_ENABLE_BATCH = "enableBatch";
-    static final boolean PROP_INFLUXDB_ENABLE_BATCH_DEFAULT = true;
-
-    // GRPC Telemetry config manager
-    static final String PROP_GRPC_ENABLE_SERVICE = "enableService";
-    static final boolean GRPC_ENABLE_SERVICE_DEFAULT = false;
-
-    static final String PROP_GRPC_SERVER_ADDRESS = "address";
-    static final String GRPC_SERVER_ADDRESS_DEFAULT = "localhost";
-
-    static final String PROP_GRPC_SERVER_PORT = "port";
-    static final int GRPC_SERVER_PORT_DEFAULT = 50051;
-
-    static final String PROP_GRPC_USE_PLAINTEXT = "usePlaintext";
-    static final boolean GRPC_USE_PLAINTEXT_DEFAULT = true;
-
-    static final String PROP_GRPC_MAX_INBOUND_MSG_SIZE = "maxInboundMsgSize";
-    static final int GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT = 4194304; //4 * 1024 * 1024;
-
-    // Prometheus Telemetry config manager
-    static final String PROP_PROMETHEUS_ENABLE_SERVICE = "enableService";
-    static final boolean PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT = true;
-
-    static final String PROP_PROMETHEUS_EXPORTER_ADDRESS = "address";
-    public static final String PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT = "localhost";
-
-    static final String PROP_PROMETHEUS_EXPORTER_PORT = "port";
-    public static final int PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT = 9555;
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusConfigsLoader.java
new file mode 100644
index 0000000..a7b740d
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for prometheus telemetry configurations.
+ */
+@Component(immediate = true)
+public class PrometheusConfigsLoader extends AbstractTelemetryConfigLoader {
+    public PrometheusConfigsLoader() {
+        super("prometheus-configs.xml");
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryConfigManager.java
deleted file mode 100644
index dcc7023..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryConfigManager.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.PrometheusTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.PrometheusTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_PORT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * Prometheus exporter configuration manager for publishing openstack telemetry.
- */
-@Component(
-    immediate = true,
-    service = PrometheusTelemetryConfigService.class,
-    property = {
-        PROP_PROMETHEUS_ENABLE_SERVICE + ":Boolean=" + PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT,
-        PROP_PROMETHEUS_EXPORTER_ADDRESS + "=" + PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT,
-        PROP_PROMETHEUS_EXPORTER_PORT + ":Integer=" + PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT
-    }
-)
-public class PrometheusTelemetryConfigManager implements PrometheusTelemetryConfigService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected PrometheusTelemetryAdminService prometheusTelemetryAdminService;
-
-    /** Default IP address of prometheus exporter. */
-    protected String address = PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
-
-    /** Default port number of prometheus exporter. */
-    protected Integer port = PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
-
-    /** Specify the default behavior of telemetry service. */
-    protected Boolean enableService = PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT;
-
-    @Activate
-    protected void activate(ComponentContext context) {
-        componentConfigService.registerProperties(getClass());
-        if (enableService) {
-            prometheusTelemetryAdminService.start(getConfig());
-        }
-        log.info("Started");
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        componentConfigService.unregisterProperties(getClass(), false);
-        if (enableService) {
-            prometheusTelemetryAdminService.stop();
-        }
-        log.info("Stopped");
-    }
-
-    @Modified
-    private void modified(ComponentContext context) {
-        readComponentConfiguration(context);
-        initTelemetryService(prometheusTelemetryAdminService, getConfig(), enableService);
-        log.info("Modified");
-    }
-
-    @Override
-    public TelemetryConfig getConfig() {
-        return new DefaultPrometheusTelemetryConfig.DefaultBuilder()
-                .withAddress(address)
-                .withPort(port)
-                .build();
-    }
-
-    /**
-     * Extracts properties from the component configuration context.
-     *
-     * @param context the component context
-     */
-    private void readComponentConfiguration(ComponentContext context) {
-        Dictionary<?, ?> properties = context.getProperties();
-
-        String addressStr = Tools.get(properties, PROP_PROMETHEUS_EXPORTER_ADDRESS);
-        address = addressStr != null ? addressStr : PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
-        log.info("Configured. Prometheus exporter address is {}", address);
-
-        Integer portConfigured = Tools.getIntegerProperty(properties, PROP_PROMETHEUS_EXPORTER_PORT);
-        if (portConfigured == null) {
-            port = PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
-            log.info("Prometheus exporter port is NOT configured, default value is {}", port);
-        } else {
-            port = portConfigured;
-            log.info("Configured. Prometheus exporter port is {}", port);
-        }
-
-        Boolean enableServiceConfigured = getBooleanProperty(properties, PROP_PROMETHEUS_ENABLE_SERVICE);
-        if (enableServiceConfigured == null) {
-            enableService = PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT;
-            log.info("Prometheus service enable flag is NOT " +
-                             "configured, default value is {}", enableService);
-        } else {
-            enableService = enableServiceConfigured;
-            log.info("Configured. Prometheus service enable flag is {}", enableService);
-        }
-    }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
index 69423ca..7111fc0 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
@@ -15,13 +15,19 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.google.common.collect.Sets;
+import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
+import io.prometheus.client.exporter.MetricsServlet;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
 import org.onlab.packet.TpPort;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.PrometheusTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.PrometheusTelemetryConfig;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -30,15 +36,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.prometheus.client.Counter;
-import io.prometheus.client.exporter.MetricsServlet;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-
 import java.util.Arrays;
 import java.util.Set;
 
+import static org.onosproject.openstacktelemetry.api.Constants.PROMETHEUS_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
+import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.fromTelemetryConfig;
+
 /**
  * Prometheus telemetry manager.
  */
@@ -47,7 +51,7 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private Server prometheusExporter;
+    private Set<Server> prometheusExporters = Sets.newConcurrentHashSet();
 
     private static final String FLOW_TYPE = "flowType";
     private static final String DEVICE_ID = "deviceId";
@@ -134,6 +138,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigService telemetryConfigService;
+
     @Activate
     protected void activate() {
         openstackTelemetryService.addTelemetryService(this);
@@ -148,38 +155,51 @@
     }
 
     @Override
-    public void start(TelemetryConfig config) {
+    public void start() {
         log.info("Prometheus exporter starts.");
 
-        PrometheusTelemetryConfig prometheusConfig = (PrometheusTelemetryConfig) config;
+        telemetryConfigService.getConfigsByType(PROMETHEUS).forEach(c -> {
+            PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(c);
 
-        try {
-            prometheusExporter = new Server(prometheusConfig.port());
-            ServletContextHandler context = new ServletContextHandler();
-            context.setContextPath("/");
-            prometheusExporter.setHandler(context);
-            context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
-            log.info("Prometeus server start (Server port:{})", prometheusConfig.port());
-            prometheusExporter.start();
-        } catch (Exception ex) {
-            log.warn("Exception: {}", ex.toString());
-        }
+            if (prometheusConfig != null &&
+                    !c.name().equals(PROMETHEUS_SCHEME) && c.enabled()) {
+                try {
+                    // TODO  Offer a 'Authentication'
+                    Server prometheusExporter = new Server(prometheusConfig.port());
+                    ServletContextHandler context = new ServletContextHandler();
+                    context.setContextPath("/");
+                    prometheusExporter.setHandler(context);
+                    context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
+
+                    log.info("Prometheus server start");
+
+                    prometheusExporter.start();
+
+                    prometheusExporters.add(prometheusExporter);
+
+                } catch (Exception ex) {
+                    log.warn("Exception: {}", ex);
+                }
+            }
+        });
     }
 
     @Override
     public void stop() {
-        try {
-            prometheusExporter.stop();
-        } catch (Exception ex) {
-            log.warn("Exception: {}", ex.toString());
-        }
+        prometheusExporters.forEach(pe -> {
+            try {
+                pe.stop();
+            } catch (Exception e) {
+                log.warn("Failed to stop prometheus server due to {}", e);
+            }
+        });
         log.info("Prometheus exporter has stopped");
     }
 
     @Override
-    public void restart(TelemetryConfig config) {
+    public void restart() {
         stop();
-        start(config);
+        start();
     }
 
     @Override
@@ -247,6 +267,6 @@
 
     @Override
     public boolean isRunning() {
-        return prometheusExporter.isRunning();
+        return !prometheusExporters.isEmpty();
     }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestConfigsLoader.java
new file mode 100644
index 0000000..ffe6cbc
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for REST telemetry configurations.
+ */
+@Component(immediate = true)
+public class RestConfigsLoader extends AbstractTelemetryConfigLoader {
+    public RestConfigsLoader() {
+        super("rest-configs.xml");
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryConfigManager.java
deleted file mode 100644
index 16b62cb..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryConfigManager.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.RestTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.RestTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENDPOINT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_METHOD;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_REQUEST_MEDIA_TYPE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_RESPONSE_MEDIA_TYPE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENDPOINT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_METHOD_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * REST server configuration manager for publishing openstack telemetry.
- */
-@Component(
-    immediate = true,
-    service = RestTelemetryConfigService.class,
-    property = {
-        PROP_REST_ENABLE_SERVICE + ":Boolean=" + PROP_REST_ENABLE_SERVICE_DEFAULT,
-        PROP_REST_SERVER_ADDRESS + "=" + PROP_REST_SERVER_ADDRESS_DEFAULT,
-        PROP_REST_SERVER_PORT + ":Integer=" + PROP_REST_SERVER_PORT_DEFAULT,
-        PROP_REST_ENDPOINT + "=" + PROP_REST_ENDPOINT_DEFAULT,
-        PROP_REST_METHOD + "=" + PROP_REST_METHOD_DEFAULT,
-        PROP_REST_REQUEST_MEDIA_TYPE + "=" + PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT,
-        PROP_REST_RESPONSE_MEDIA_TYPE + "=" + PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT
-    }
-)
-public class RestTelemetryConfigManager implements RestTelemetryConfigService {
-
-    private final Logger log = LoggerFactory.getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected RestTelemetryAdminService restTelemetryAdminService;
-
-    /** Default IP address to establish initial connection to REST server. */
-    protected String address = PROP_REST_SERVER_ADDRESS_DEFAULT;
-
-    /** Default port number to establish initial connection to REST server. */
-    protected Integer port = PROP_REST_SERVER_PORT_DEFAULT;
-
-    /** Endpoint of REST server. */
-    protected String endpoint = PROP_REST_ENDPOINT_DEFAULT;
-
-    /** HTTP method of REST server. */
-    protected String method = PROP_REST_METHOD_DEFAULT;
-
-    /** Request media type of REST server. */
-    protected String requestMediaType = PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT;
-
-    /** Response media type of REST server. */
-    protected String responseMediaType = PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT;
-
-    /** Specify the default behavior of telemetry service. */
-    protected Boolean enableService = PROP_REST_ENABLE_SERVICE_DEFAULT;
-
-    @Activate
-    protected void activate(ComponentContext context) {
-        componentConfigService.registerProperties(getClass());
-
-        if (enableService) {
-            restTelemetryAdminService.start(getConfig());
-        }
-        log.info("Started");
-    }
-
-    @Deactivate
-    protected void deactivate() {
-        componentConfigService.unregisterProperties(getClass(), false);
-
-        if (enableService) {
-            restTelemetryAdminService.stop();
-        }
-        log.info("Stopped");
-    }
-
-    @Modified
-    private void modified(ComponentContext context) {
-        readComponentConfiguration(context);
-        initTelemetryService(restTelemetryAdminService, getConfig(), enableService);
-        log.info("Modified");
-    }
-
-    @Override
-    public TelemetryConfig getConfig() {
-        return new DefaultRestTelemetryConfig.DefaultBuilder()
-                .withAddress(address)
-                .withPort(port)
-                .withEndpoint(endpoint)
-                .withMethod(method)
-                .withRequestMediaType(requestMediaType)
-                .withResponseMediaType(responseMediaType)
-                .build();
-    }
-
-    /**
-     * Extracts properties from the component configuration context.
-     *
-     * @param context the component context
-     */
-    private void readComponentConfiguration(ComponentContext context) {
-        Dictionary<?, ?> properties = context.getProperties();
-
-        String addressStr = Tools.get(properties, PROP_REST_SERVER_ADDRESS);
-        address = addressStr != null ? addressStr : PROP_REST_SERVER_ADDRESS_DEFAULT;
-        log.info("Configured. REST server address is {}", address);
-
-        Integer portConfigured = Tools.getIntegerProperty(properties, PROP_REST_SERVER_PORT);
-        if (portConfigured == null) {
-            port = PROP_REST_SERVER_PORT_DEFAULT;
-            log.info("REST server port is NOT configured, default value is {}", port);
-        } else {
-            port = portConfigured;
-            log.info("Configured. REST server port is {}", port);
-        }
-
-        String endpointStr = Tools.get(properties, PROP_REST_ENDPOINT);
-        endpoint = endpointStr != null ? endpointStr : PROP_REST_ENDPOINT_DEFAULT;
-        log.info("Configured. REST server endpoint is {}", endpoint);
-
-        String methodStr = Tools.get(properties, PROP_REST_METHOD);
-        method = methodStr != null ? methodStr : PROP_REST_METHOD_DEFAULT;
-        log.info("Configured. REST server default HTTP method is {}", method);
-
-        String requestMediaTypeStr = Tools.get(properties, PROP_REST_REQUEST_MEDIA_TYPE);
-        requestMediaType = requestMediaTypeStr != null ?
-                requestMediaTypeStr : PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT;
-        log.info("Configured. REST server request media type is {}", requestMediaType);
-
-        String responseMediaTypeStr = Tools.get(properties, PROP_REST_RESPONSE_MEDIA_TYPE);
-        responseMediaType = responseMediaTypeStr != null ?
-                responseMediaTypeStr : PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT;
-        log.info("Configured. REST server response media type is {}", responseMediaType);
-
-        Boolean enableServiceConfigured =
-                getBooleanProperty(properties, PROP_REST_ENABLE_SERVICE);
-        if (enableServiceConfigured == null) {
-            enableService = PROP_REST_ENABLE_SERVICE_DEFAULT;
-            log.info("REST service enable flag is NOT " +
-                    "configured, default value is {}", enableService);
-        } else {
-            enableService = enableServiceConfigured;
-            log.info("Configured. REST service enable flag is {}", enableService);
-        }
-    }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
index 9b4aaf9..48cef34 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
@@ -15,8 +15,11 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.RestTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.RestTelemetryConfig;
 import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
@@ -32,6 +35,12 @@
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.Response;
+import java.util.Map;
+import java.util.Set;
+
+import static org.onosproject.openstacktelemetry.api.Constants.REST_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.fromTelemetryConfig;
 
 /**
  * REST telemetry manager.
@@ -48,8 +57,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
-    private WebTarget target = null;
-    private RestTelemetryConfig restConfig = null;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigService telemetryConfigService;
+
+    private Map<String, WebTarget> targets = Maps.newConcurrentMap();
 
     @Activate
     protected void activate() {
@@ -69,85 +80,71 @@
     }
 
     @Override
-    public void start(TelemetryConfig config) {
-        if (target != null) {
-            log.info("REST producer has already been started");
-            return;
-        }
+    public void start() {
 
-        restConfig = (RestTelemetryConfig) config;
+        telemetryConfigService.getConfigsByType(REST).forEach(c -> {
+            RestTelemetryConfig restConfig = fromTelemetryConfig(c);
 
-        StringBuilder restServerBuilder = new StringBuilder();
-        restServerBuilder.append(PROTOCOL);
-        restServerBuilder.append(":");
-        restServerBuilder.append("//");
-        restServerBuilder.append(restConfig.address());
-        restServerBuilder.append(":");
-        restServerBuilder.append(restConfig.port());
-        restServerBuilder.append("/");
+            if (restConfig != null && !c.name().equals(REST_SCHEME) && c.enabled()) {
+                StringBuilder restServerBuilder = new StringBuilder();
+                restServerBuilder.append(PROTOCOL);
+                restServerBuilder.append(":");
+                restServerBuilder.append("//");
+                restServerBuilder.append(restConfig.address());
+                restServerBuilder.append(":");
+                restServerBuilder.append(restConfig.port());
+                restServerBuilder.append("/");
 
-        Client client = ClientBuilder.newBuilder().build();
+                Client client = ClientBuilder.newBuilder().build();
 
-        target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+                WebTarget target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+
+                targets.put(c.name(), target);
+            }
+        });
 
         log.info("REST producer has Started");
     }
 
     @Override
     public void stop() {
-        if (target != null) {
-            target = null;
-        }
-
+        targets.values().forEach(t -> t = null);
         log.info("REST producer has Stopped");
     }
 
     @Override
-    public void restart(TelemetryConfig config) {
+    public void restart() {
         stop();
-        start(config);
+        start();
     }
 
     @Override
-    public Response publish(String endpoint, String method, String record) {
-        // TODO: need to find a way to invoke REST endpoint using target
-        return null;
-    }
+    public Set<Response> publish(String record) {
 
-    @Override
-    public Response publish(String method, String record) {
-        switch (method) {
-            case POST_METHOD:
-                return target.request(restConfig.requestMediaType())
-                        .post(Entity.json(record));
-            case GET_METHOD:
-                return target.request(restConfig.requestMediaType()).get();
-            default:
-                return null;
-        }
-    }
+        Set<Response> responses = Sets.newConcurrentHashSet();
 
-    @Override
-    public Response publish(String record) {
+        targets.forEach((k, v) -> {
+            TelemetryConfig config = telemetryConfigService.getConfig(k);
+            RestTelemetryConfig restConfig = fromTelemetryConfig(config);
 
-        if (target == null) {
-            log.debug("REST telemetry service has not been enabled!");
-            return null;
-        }
+            switch (restConfig.method()) {
+                case POST_METHOD:
+                    responses.add(v.request(restConfig.requestMediaType())
+                            .post(Entity.json(record)));
+                    break;
+                case GET_METHOD:
+                    responses.add(v.request(restConfig.requestMediaType()).get());
+                    break;
+                default:
+                    break;
+            }
+        });
 
-        switch (restConfig.method()) {
-            case POST_METHOD:
-                return target.request(restConfig.requestMediaType())
-                        .post(Entity.json(record));
-            case GET_METHOD:
-                return target.request(restConfig.requestMediaType()).get();
-            default:
-                return null;
-        }
+        return responses;
     }
 
     @Override
     public boolean isRunning() {
-        return target != null;
+        return !targets.isEmpty();
     }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/TelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/TelemetryConfigManager.java
new file mode 100644
index 0000000..3ae2504
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/TelemetryConfigManager.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigListener;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStore;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStoreDelegate;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.nullIsNotFound;
+import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+
+/**
+ * Provides implementation of administering and interfacing telemetry configs.
+ * It also provides telemetry config events for the various exporters or connectors.
+ */
+@Component(
+    immediate = true,
+    service = {
+            TelemetryConfigService.class, TelemetryConfigAdminService.class
+    }
+)
+public class TelemetryConfigManager
+        extends ListenerRegistry<TelemetryConfigEvent, TelemetryConfigListener>
+        implements TelemetryConfigService, TelemetryConfigAdminService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final String MSG_TELEMETRY_CONFIG = "Telemetry config %s %s";
+    private static final String MSG_CREATED = "created";
+    private static final String MSG_UPDATED = "updated";
+    private static final String MSG_REMOVED = "removed";
+
+    private static final String ERR_NULL_CONFIG = "Telemetry config cannot be null";
+    private static final String NO_CONFIG = "Telemetry config not found";
+
+    private static final KryoNamespace SERIALIZER_TELEMETRY_CONFIG =
+            KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.API)
+                    .register(TelemetryConfigProvider.class)
+                    .register(DefaultTelemetryConfigProvider.class)
+                    .register(TelemetryConfig.class)
+                    .register(TelemetryConfig.ConfigType.class)
+                    .register(DefaultTelemetryConfig.class)
+                    .build();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigStore telemetryConfigStore;
+
+    private final TelemetryConfigStoreDelegate
+                        delegate = new InternalTelemetryConfigStoreDelegate();
+
+    private ConsistentMap<String, TelemetryConfigProvider> providerMap;
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        telemetryConfigStore.setDelegate(delegate);
+        leadershipService.runForLeadership(appId.name());
+
+        providerMap = storageService.<String, TelemetryConfigProvider>consistentMapBuilder()
+                .withSerializer(Serializer.using(SERIALIZER_TELEMETRY_CONFIG))
+                .withName("openstack-telemetry-config-provider")
+                .withApplicationId(appId)
+                .build();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        telemetryConfigStore.unsetDelegate(delegate);
+        leadershipService.withdraw(appId.name());
+        providerMap.clear();
+
+        log.info("Stopped");
+    }
+
+    @Override
+    public Set<TelemetryConfigProvider> getProviders() {
+        ImmutableSet.Builder<TelemetryConfigProvider> builder = ImmutableSet.builder();
+        providerMap.asJavaMap().values().forEach(builder::add);
+        return builder.build();
+    }
+
+    @Override
+    public void registerProvider(TelemetryConfigProvider provider) {
+        if (isLeader()) {
+            StringBuilder nameBuilder = new StringBuilder();
+            provider.getTelemetryConfigs().forEach(config -> {
+                nameBuilder.append(config.name());
+                telemetryConfigStore.createTelemetryConfig(config);
+                log.info(String.format(MSG_TELEMETRY_CONFIG, config.name(), MSG_CREATED));
+            });
+            providerMap.put(nameBuilder.toString(), provider);
+        }
+    }
+
+    @Override
+    public void unregisterProvider(TelemetryConfigProvider provider) {
+        if (isLeader()) {
+            StringBuilder nameBuilder = new StringBuilder();
+            provider.getTelemetryConfigs().forEach(config -> {
+                nameBuilder.append(config.name());
+                telemetryConfigStore.removeTelemetryConfig(config.name());
+                log.info(String.format(MSG_TELEMETRY_CONFIG, config.name(), MSG_REMOVED));
+            });
+            providerMap.remove(nameBuilder.toString());
+        }
+    }
+
+    @Override
+    public void updateTelemetryConfig(TelemetryConfig config) {
+        checkNotNull(config, ERR_NULL_CONFIG);
+
+        telemetryConfigStore.updateTelemetryConfig(config);
+        log.info(String.format(MSG_TELEMETRY_CONFIG, config.name(), MSG_UPDATED));
+    }
+
+    @Override
+    public TelemetryConfig getConfig(String name) {
+        return nullIsNotFound(telemetryConfigStore.telemetryConfig(name), NO_CONFIG);
+    }
+
+    @Override
+    public Set<TelemetryConfig> getConfigsByType(ConfigType type) {
+        return ImmutableSet.copyOf(telemetryConfigStore.telemetryConfigsByType(type));
+    }
+
+    @Override
+    public Set<TelemetryConfig> getConfigs() {
+        return ImmutableSet.copyOf(telemetryConfigStore.telemetryConfigs());
+    }
+
+    private boolean isLeader() {
+        return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+    }
+
+    private class InternalTelemetryConfigStoreDelegate implements TelemetryConfigStoreDelegate {
+
+        @Override
+        public void notify(TelemetryConfigEvent event) {
+            if (event != null) {
+                log.trace("send telemetry config event {}", event);
+                process(event);
+            }
+        }
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoader.java
new file mode 100644
index 0000000..39a5df0
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoader.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.HierarchicalConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.UNKNOWN;
+
+/**
+ * Utility capable of reading telemetry configuration XML resources and producing
+ * a telemetry config as a result.
+ * <p>
+ * The telemetry configurations stream structure is as follows:
+ * </p>
+ * <pre>
+ *     &lt;configs&gt;
+ *         &lt;config name=“...” [manufacturer="..." swVersion="..."]&gt;
+ *            [&lt;property name=“key”&gt;value&lt;/key&gt;]
+ *            ...
+ *        &lt;/config&gt;
+ *        ...
+ *     &lt;/configs&gt;
+ * </pre>
+ */
+public class XmlTelemetryConfigLoader {
+
+    private static final String CONFIGS = "configs";
+    private static final String CONFIG = "config";
+
+    private static final String PROPERTY = "property";
+
+    private static final String TRUE = "true";
+
+    private static final String NAME = "[@name]";
+    private static final String TYPE = "[@type]";
+    private static final String EXTENDS = "[@extends]";
+    private static final String MFG = "[@manufacturer]";
+    private static final String SW = "[@swVersion]";
+    private static final String ENABLED = "[@enabled]";
+
+    private Map<String, TelemetryConfig> configs = Maps.newHashMap();
+
+    /**
+     * Creates a new config loader capable of loading configs from the supplied
+     * class loader.
+     */
+    public XmlTelemetryConfigLoader() {
+    }
+
+    /**
+     * Loads the specified telemetry configs resource as an XML stream and parses
+     * it to produce a ready-to-register config provider.
+     *
+     * @param configsStream stream containing the telemetry configs definition
+     * @return telemetry configuration provider
+     * @throws IOException if issues are encountered reading the stream
+     *                     or parsing the telemetry config definition within
+     */
+    public DefaultTelemetryConfigProvider
+            loadTelemetryConfigs(InputStream configsStream) throws IOException {
+        try {
+            XMLConfiguration cfg = new XMLConfiguration();
+            cfg.setRootElementName(CONFIGS);
+            cfg.setAttributeSplittingDisabled(true);
+
+            cfg.load(configsStream);
+            return loadTelemetryConfigs(cfg);
+        } catch (ConfigurationException e) {
+            throw new IOException("Unable to load telemetry configs", e);
+        }
+    }
+
+    /**
+     * Loads a telemetry config provider from the supplied hierarchical configuration.
+     *
+     * @param telemetryConfig hierarchical configuration containing the configs definition
+     * @return telemetry configuration provider
+     */
+    public DefaultTelemetryConfigProvider
+                loadTelemetryConfigs(HierarchicalConfiguration telemetryConfig) {
+        DefaultTelemetryConfigProvider provider = new DefaultTelemetryConfigProvider();
+        for (HierarchicalConfiguration cfg : telemetryConfig.configurationsAt(CONFIG)) {
+            DefaultTelemetryConfig config = loadTelemetryConfig(cfg);
+            configs.put(config.name(), config);
+            provider.addConfig(config);
+        }
+        configs.clear();
+        return provider;
+    }
+
+    /**
+     * Loads a telemetry configuration from the supplied hierarchical configuration.
+     *
+     * @param telemetryCfg hierarchical configuration containing the telemetry config definition
+     * @return telemetry configuration
+     */
+    public DefaultTelemetryConfig loadTelemetryConfig(HierarchicalConfiguration telemetryCfg) {
+        String name = telemetryCfg.getString(NAME);
+        String parentsString = telemetryCfg.getString(EXTENDS, "");
+        List<TelemetryConfig> parents = Lists.newArrayList();
+
+        if (!"".equals(parentsString)) {
+            List<String> parentsNames;
+            if (parentsString.contains(",")) {
+                parentsNames = Arrays.asList(
+                        parentsString.replace(" ", "").split(","));
+            } else {
+                parentsNames = Lists.newArrayList(parentsString);
+            }
+            parents = parentsNames.stream().map(parent -> (parent != null) ?
+                    configs.get(parent) : null).collect(Collectors.toList());
+        }
+
+        String typeStr = telemetryCfg.getString(TYPE, getParentAttribute(parents, TYPE));
+        String manufacturer = telemetryCfg.getString(MFG, getParentAttribute(parents, MFG));
+        String swVersion = telemetryCfg.getString(SW, getParentAttribute(parents, SW));
+
+        // note that we do not inherits enabled property from parent
+        String enabledStr = telemetryCfg.getString(ENABLED);
+
+        boolean enabled = enabledStr != null && enabledStr.equalsIgnoreCase(TRUE);
+
+        TelemetryConfig.ConfigType type = type(typeStr);
+
+        if (type == null) {
+            return null;
+        }
+
+        return new DefaultTelemetryConfig(name, type, parents, manufacturer,
+                swVersion, enabled, parseProperties(parents, telemetryCfg));
+    }
+
+    private TelemetryConfig.ConfigType type(String typeStr) {
+        switch (typeStr.toUpperCase()) {
+            case "GRPC" :
+                return GRPC;
+            case "KAFKA":
+                return KAFKA;
+            case "REST":
+                return REST;
+            case "INFLUXDB":
+                return INFLUXDB;
+            case "PROMETHEUS":
+                return PROMETHEUS;
+            case "UNKNOWN":
+            default:
+                return UNKNOWN;
+        }
+    }
+
+    // Returns the specified property from the highest priority parent
+    private String getParentAttribute(List<TelemetryConfig> parents, String attribute) {
+        if (!parents.isEmpty()) {
+            TelemetryConfig parent = parents.get(0);
+            switch (attribute) {
+                case TYPE:
+                    return parent.type().name().toLowerCase();
+                case MFG:
+                    return parent.manufacturer();
+                case SW:
+                    return parent.swVersion();
+                default:
+                    throw new IllegalArgumentException("Unsupported attribute");
+            }
+        }
+        return "";
+    }
+
+    // Parses the properties section.
+    private Map<String, String> parseProperties(List<TelemetryConfig> parents,
+                                                HierarchicalConfiguration config) {
+        ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+        // note that, we only allow the inheritance from single source
+        Map<String, String> parentConfigs = Maps.newHashMap();
+        if (!parents.isEmpty()) {
+            TelemetryConfig parent = parents.get(0);
+            parentConfigs = parent.properties();
+        }
+
+        properties.putAll(parentConfigs);
+
+        for (HierarchicalConfiguration b : config.configurationsAt(PROPERTY)) {
+            properties.put(b.getString(NAME), (String) b.getRootNode().getValue());
+        }
+        return properties.build();
+    }
+}