Improve the telemetry service LCM granularity

Change-Id: I3d1b43b360883b0644af2341bdc21aeb4f603414
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java
index 62398db..63e0c3f 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java
@@ -22,16 +22,37 @@
 
     /**
      * Prepares and launches the telemetry producer.
+     *
+     * @param name telemetry service name
      */
-    void start();
+    void start(String name);
 
     /**
      * Terminates the telemetry producer.
+     *
+     * @param name telemetry service name
      */
-    void stop();
+    void stop(String name);
 
     /**
      * Restarts the telemetry producer.
+     *
+     * @param name telemetry service name
      */
-    void restart();
+    void restart(String name);
+
+    /**
+     * Launches all telemetry services.
+     */
+    void startAll();
+
+    /**
+     * Terminates all telemetry services.
+     */
+    void stopAll();
+
+    /**
+     * Restarts all telemetry services.
+     */
+    void restartAll();
 }
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java
index a0c2714..3f22437 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java
@@ -21,7 +21,8 @@
 /**
  * Describes telemetry config event.
  */
-public class TelemetryConfigEvent extends AbstractEvent<TelemetryConfigEvent.Type, TelemetryConfig> {
+public class TelemetryConfigEvent
+        extends AbstractEvent<TelemetryConfigEvent.Type, TelemetryConfig> {
 
     /**
      * Telemetry config event type.
@@ -38,7 +39,15 @@
         /**
          * Signifies that an existing telemetry config is removed.
          */
-        CONFIG_DELETED
+        CONFIG_DELETED,
+        /**
+         * Signifies that a telemetry service is enabled.
+         */
+        SERVICE_ENABLED,
+        /**
+         * Signifies that a telemetry service is disabled.
+         */
+        SERVICE_DISABLED
     }
 
     /**
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
index b1d6133..8fc31a6 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
@@ -51,6 +51,8 @@
 import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_ADDED;
 import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_DELETED;
 import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_UPDATED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.SERVICE_DISABLED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.SERVICE_ENABLED;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -188,6 +190,9 @@
         private void processTelemetryConfigMapUpdate(MapEvent<String,
                                                      TelemetryConfig> event) {
             log.debug("Telemetry config updated");
+
+            processTelemetryServiceStatusChange(event);
+
             notifyDelegate(new TelemetryConfigEvent(
                     CONFIG_UPDATED, event.newValue().value()));
         }
@@ -198,5 +203,21 @@
             notifyDelegate(new TelemetryConfigEvent(
                     CONFIG_DELETED, event.oldValue().value()));
         }
+
+        private void processTelemetryServiceStatusChange(
+                                    MapEvent<String, TelemetryConfig> event) {
+            TelemetryConfig oldValue = event.oldValue().value();
+            TelemetryConfig newValue = event.newValue().value();
+
+            if (oldValue.enabled() && !newValue.enabled()) {
+                log.debug("Telemetry service {} has been disabled!", newValue.name());
+                notifyDelegate(new TelemetryConfigEvent(SERVICE_DISABLED, newValue));
+            }
+
+            if (!oldValue.enabled() && newValue.enabled()) {
+                log.debug("Telemetry service {} has been enabled!", newValue.name());
+                notifyDelegate(new TelemetryConfigEvent(SERVICE_ENABLED, newValue));
+            }
+        }
     }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
index f487504..92e9aef 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
@@ -15,13 +15,14 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.GrpcTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -30,7 +31,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Set;
+import java.util.Map;
 
 import static org.onosproject.openstacktelemetry.api.Constants.GRPC_SCHEME;
 import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
@@ -50,7 +51,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected TelemetryConfigService telemetryConfigService;
 
-    private Set<ManagedChannel> channels = Sets.newConcurrentHashSet();
+    private Map<String, ManagedChannel> channels = Maps.newConcurrentMap();
 
     @Activate
     protected void activate() {
@@ -62,7 +63,7 @@
 
     @Deactivate
     protected void deactivate() {
-        stop();
+        stopAll();
 
         openstackTelemetryService.removeTelemetryService(this);
 
@@ -70,34 +71,53 @@
     }
 
     @Override
-    public void start() {
-        telemetryConfigService.getConfigsByType(GRPC).forEach(c -> {
-            GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(c);
+    public void start(String name) {
+        TelemetryConfig config = telemetryConfigService.getConfig(name);
+        GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(config);
 
-            if (grpcConfig != null && !c.name().equals(GRPC_SCHEME) && c.enabled()) {
-                ManagedChannel channel = ManagedChannelBuilder
-                        .forAddress(grpcConfig.address(), grpcConfig.port())
-                        .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
-                        .usePlaintext(grpcConfig.usePlaintext())
-                        .build();
+        if (grpcConfig != null && !config.name().equals(GRPC_SCHEME) && config.enabled()) {
+            ManagedChannel channel = ManagedChannelBuilder
+                    .forAddress(grpcConfig.address(), grpcConfig.port())
+                    .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
+                    .usePlaintext(grpcConfig.usePlaintext())
+                    .build();
 
-                channels.add(channel);
-            }
-        });
+            channels.put(name, channel);
+        }
+    }
 
+    @Override
+    public void stop(String name) {
+        ManagedChannel channel = channels.get(name);
+
+        if (channel != null) {
+            channel.shutdown();
+            channels.remove(name);
+        }
+    }
+
+    @Override
+    public void restart(String name) {
+        stop(name);
+        start(name);
+    }
+
+    @Override
+    public void startAll() {
+        telemetryConfigService.getConfigsByType(GRPC).forEach(c -> start(c.name()));
         log.info("gRPC producer has Started");
     }
 
     @Override
-    public void stop() {
-        channels.forEach(ManagedChannel::shutdown);
+    public void stopAll() {
+        channels.values().forEach(ManagedChannel::shutdown);
         log.info("gRPC producer has Stopped");
     }
 
     @Override
-    public void restart() {
-        stop();
-        start();
+    public void restartAll() {
+        stopAll();
+        startAll();
     }
 
     @Override
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 781c4e2..962779a 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -93,7 +93,7 @@
 
     @Deactivate
     protected void deactivate() {
-        stop();
+        stopAll();
 
         openstackTelemetryService.removeTelemetryService(this);
 
@@ -101,47 +101,6 @@
     }
 
     @Override
-    public void start() {
-
-        telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> {
-            InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(c);
-
-            if (influxDbConfig != null && !c.name().equals(INFLUXDB_SCHEME) && c.enabled()) {
-                StringBuilder influxDbServerBuilder = new StringBuilder();
-                influxDbServerBuilder.append(INFLUX_PROTOCOL);
-                influxDbServerBuilder.append(":");
-                influxDbServerBuilder.append("//");
-                influxDbServerBuilder.append(influxDbConfig.address());
-                influxDbServerBuilder.append(":");
-                influxDbServerBuilder.append(influxDbConfig.port());
-
-                InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
-                        influxDbConfig.username(), influxDbConfig.password());
-                producers.put(c.name(), producer);
-
-                createDB(producer, influxDbConfig.database());
-            }
-        });
-
-        log.info("InfluxDB producer has Started");
-    }
-
-    @Override
-    public void stop() {
-        if (producers != null) {
-            producers.values().forEach(InfluxDB::close);
-        }
-
-        log.info("InfluxDB producer has stopped");
-    }
-
-    @Override
-    public void restart() {
-        stop();
-        start();
-    }
-
-    @Override
     public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
         if (producers == null || producers.isEmpty()) {
             log.debug("InfluxDB telemetry service has not been enabled!");
@@ -224,4 +183,63 @@
         }
         return tpPort.toString();
     }
+
+    @Override
+    public void start(String name) {
+        TelemetryConfig config = telemetryConfigService.getConfig(name);
+        InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
+
+        if (influxDbConfig != null &&
+                !config.name().equals(INFLUXDB_SCHEME) && config.enabled()) {
+            StringBuilder influxDbServerBuilder = new StringBuilder();
+            influxDbServerBuilder.append(INFLUX_PROTOCOL);
+            influxDbServerBuilder.append(":");
+            influxDbServerBuilder.append("//");
+            influxDbServerBuilder.append(influxDbConfig.address());
+            influxDbServerBuilder.append(":");
+            influxDbServerBuilder.append(influxDbConfig.port());
+
+            InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
+                    influxDbConfig.username(), influxDbConfig.password());
+            producers.put(config.name(), producer);
+
+            createDB(producer, influxDbConfig.database());
+        }
+    }
+
+    @Override
+    public void stop(String name) {
+        InfluxDB producer = producers.get(name);
+
+        if (producer != null) {
+            producer.close();
+            producers.remove(name);
+        }
+    }
+
+    @Override
+    public void restart(String name) {
+        stop(name);
+        start(name);
+    }
+
+    @Override
+    public void startAll() {
+        telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> start(c.name()));
+        log.info("InfluxDB producer has Started");    }
+
+    @Override
+    public void stopAll() {
+        if (producers != null) {
+            producers.values().forEach(InfluxDB::close);
+        }
+
+        log.info("InfluxDB producer has stopped");
+    }
+
+    @Override
+    public void restartAll() {
+        stopAll();
+        startAll();
+    }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 85f36e3..152bfde 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -83,7 +83,7 @@
 
     @Deactivate
     protected void deactivate() {
-        stop();
+        stopAll();
 
         openstackTelemetryService.removeTelemetryService(this);
 
@@ -91,52 +91,6 @@
     }
 
     @Override
-    public void start() {
-        telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
-            KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
-
-            if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
-                StringBuilder kafkaServerBuilder = new StringBuilder();
-                kafkaServerBuilder.append(kafkaConfig.address());
-                kafkaServerBuilder.append(":");
-                kafkaServerBuilder.append(kafkaConfig.port());
-
-                // Configure Kafka server properties
-                Properties prop = new Properties();
-                prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
-                prop.put(RETRIES, kafkaConfig.retries());
-                prop.put(ACKS, kafkaConfig.requiredAcks());
-                prop.put(BATCH_SIZE, kafkaConfig.batchSize());
-                prop.put(LINGER_MS, kafkaConfig.lingerMs());
-                prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
-                prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
-                prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
-
-                producers.put(c.name(), new KafkaProducer<>(prop));
-            }
-        });
-
-        log.info("Kafka producer has Started");
-    }
-
-    @Override
-    public void stop() {
-        if (!producers.isEmpty()) {
-            producers.values().forEach(Producer::close);
-        }
-
-        producers.clear();
-
-        log.info("Kafka producer has Stopped");
-    }
-
-    @Override
-    public void restart() {
-        stop();
-        start();
-    }
-
-    @Override
     public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
 
         if (producers == null || producers.isEmpty()) {
@@ -170,4 +124,70 @@
     public boolean isRunning() {
         return !producers.isEmpty();
     }
+
+    @Override
+    public void start(String name) {
+        TelemetryConfig config = telemetryConfigService.getConfig(name);
+        KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+
+        if (kafkaConfig != null &&
+                !config.name().equals(KAFKA_SCHEME) && config.enabled()) {
+            StringBuilder kafkaServerBuilder = new StringBuilder();
+            kafkaServerBuilder.append(kafkaConfig.address());
+            kafkaServerBuilder.append(":");
+            kafkaServerBuilder.append(kafkaConfig.port());
+
+            // Configure Kafka server properties
+            Properties prop = new Properties();
+            prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
+            prop.put(RETRIES, kafkaConfig.retries());
+            prop.put(ACKS, kafkaConfig.requiredAcks());
+            prop.put(BATCH_SIZE, kafkaConfig.batchSize());
+            prop.put(LINGER_MS, kafkaConfig.lingerMs());
+            prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
+            prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
+            prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
+
+            producers.put(config.name(), new KafkaProducer<>(prop));
+        }
+    }
+
+    @Override
+    public void stop(String name) {
+        Producer<String, byte[]> producer = producers.get(name);
+
+        if (producer != null) {
+            producer.close();
+            producers.remove(name);
+        }
+    }
+
+    @Override
+    public void restart(String name) {
+        stop(name);
+        start(name);
+    }
+
+    @Override
+    public void startAll() {
+        telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> start(c.name()));
+        log.info("Kafka producer has Started");
+    }
+
+    @Override
+    public void stopAll() {
+        if (!producers.isEmpty()) {
+            producers.values().forEach(Producer::close);
+        }
+
+        producers.clear();
+
+        log.info("Kafka producer has Stopped");
+    }
+
+    @Override
+    public void restartAll() {
+        stopAll();
+        startAll();
+    }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index ce325cd..d49cdc0 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -154,8 +154,15 @@
             TelemetryAdminService service =
                     telemetryService(event.subject().type().name());
 
-            if (service != null) {
-                service.restart();
+            switch (event.type()) {
+                case SERVICE_ENABLED:
+                    service.start(event.subject().name());
+                    break;
+                case SERVICE_DISABLED:
+                    service.stop(event.subject().name());
+                    break;
+                default:
+                    break;
             }
         }
     }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
index 7111fc0..3ccc02e 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
@@ -15,7 +15,7 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
 import io.prometheus.client.exporter.MetricsServlet;
@@ -28,6 +28,7 @@
 import org.onosproject.openstacktelemetry.api.PrometheusTelemetryAdminService;
 import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.PrometheusTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -37,6 +38,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
+import java.util.Map;
 import java.util.Set;
 
 import static org.onosproject.openstacktelemetry.api.Constants.PROMETHEUS_SCHEME;
@@ -51,7 +53,7 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private Set<Server> prometheusExporters = Sets.newConcurrentHashSet();
+    private Map<String, Server> prometheusExporters = Maps.newConcurrentMap();
 
     private static final String FLOW_TYPE = "flowType";
     private static final String DEVICE_ID = "deviceId";
@@ -149,44 +151,21 @@
 
     @Deactivate
     protected void deactivate() {
-        stop();
+        stopAll();
         openstackTelemetryService.removeTelemetryService(this);
         log.info("Stopped");
     }
 
     @Override
-    public void start() {
+    public void startAll() {
         log.info("Prometheus exporter starts.");
 
-        telemetryConfigService.getConfigsByType(PROMETHEUS).forEach(c -> {
-            PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(c);
-
-            if (prometheusConfig != null &&
-                    !c.name().equals(PROMETHEUS_SCHEME) && c.enabled()) {
-                try {
-                    // TODO  Offer a 'Authentication'
-                    Server prometheusExporter = new Server(prometheusConfig.port());
-                    ServletContextHandler context = new ServletContextHandler();
-                    context.setContextPath("/");
-                    prometheusExporter.setHandler(context);
-                    context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
-
-                    log.info("Prometheus server start");
-
-                    prometheusExporter.start();
-
-                    prometheusExporters.add(prometheusExporter);
-
-                } catch (Exception ex) {
-                    log.warn("Exception: {}", ex);
-                }
-            }
-        });
+        telemetryConfigService.getConfigsByType(PROMETHEUS).forEach(c -> start(c.name()));
     }
 
     @Override
-    public void stop() {
-        prometheusExporters.forEach(pe -> {
+    public void stopAll() {
+        prometheusExporters.values().forEach(pe -> {
             try {
                 pe.stop();
             } catch (Exception e) {
@@ -197,9 +176,9 @@
     }
 
     @Override
-    public void restart() {
-        stop();
-        start();
+    public void restartAll() {
+        stopAll();
+        startAll();
     }
 
     @Override
@@ -269,4 +248,50 @@
     public boolean isRunning() {
         return !prometheusExporters.isEmpty();
     }
+
+    @Override
+    public void start(String name) {
+        TelemetryConfig config = telemetryConfigService.getConfig(name);
+        PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(config);
+
+        if (prometheusConfig != null &&
+                !config.name().equals(PROMETHEUS_SCHEME) && config.enabled()) {
+            try {
+                // TODO  Offer a 'Authentication'
+                Server prometheusExporter = new Server(prometheusConfig.port());
+                ServletContextHandler context = new ServletContextHandler();
+                context.setContextPath("/");
+                prometheusExporter.setHandler(context);
+                context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
+
+                log.info("Prometheus server start");
+
+                prometheusExporter.start();
+
+                prometheusExporters.put(name, prometheusExporter);
+
+            } catch (Exception ex) {
+                log.warn("Exception: {}", ex);
+            }
+        }
+    }
+
+    @Override
+    public void stop(String name) {
+        try {
+            Server pe = prometheusExporters.get(name);
+            if (pe != null) {
+                pe.stop();
+                prometheusExporters.remove(name);
+            }
+        } catch (Exception e) {
+            log.warn("Failed to stop prometheus server due to {}", e);
+        }
+    }
+
+    @Override
+    public void restart(String name) {
+        stop(name);
+        start(name);
+    }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
index 48cef34..f5641bd 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
@@ -72,7 +72,7 @@
 
     @Deactivate
     protected void deactivate() {
-        stop();
+        stopAll();
 
         openstackTelemetryService.removeTelemetryService(this);
 
@@ -80,42 +80,23 @@
     }
 
     @Override
-    public void start() {
+    public void startAll() {
 
-        telemetryConfigService.getConfigsByType(REST).forEach(c -> {
-            RestTelemetryConfig restConfig = fromTelemetryConfig(c);
-
-            if (restConfig != null && !c.name().equals(REST_SCHEME) && c.enabled()) {
-                StringBuilder restServerBuilder = new StringBuilder();
-                restServerBuilder.append(PROTOCOL);
-                restServerBuilder.append(":");
-                restServerBuilder.append("//");
-                restServerBuilder.append(restConfig.address());
-                restServerBuilder.append(":");
-                restServerBuilder.append(restConfig.port());
-                restServerBuilder.append("/");
-
-                Client client = ClientBuilder.newBuilder().build();
-
-                WebTarget target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
-
-                targets.put(c.name(), target);
-            }
-        });
+        telemetryConfigService.getConfigsByType(REST).forEach(c -> start(c.name()));
 
         log.info("REST producer has Started");
     }
 
     @Override
-    public void stop() {
+    public void stopAll() {
         targets.values().forEach(t -> t = null);
         log.info("REST producer has Stopped");
     }
 
     @Override
-    public void restart() {
-        stop();
-        start();
+    public void restartAll() {
+        stopAll();
+        startAll();
     }
 
     @Override
@@ -147,4 +128,45 @@
     public boolean isRunning() {
         return !targets.isEmpty();
     }
+
+    @Override
+    public void start(String name) {
+        TelemetryConfig config = telemetryConfigService.getConfig(name);
+        RestTelemetryConfig restConfig = fromTelemetryConfig(config);
+
+        if (restConfig != null &&
+                !config.name().equals(REST_SCHEME) && config.enabled()) {
+            StringBuilder restServerBuilder = new StringBuilder();
+            restServerBuilder.append(PROTOCOL);
+            restServerBuilder.append(":");
+            restServerBuilder.append("//");
+            restServerBuilder.append(restConfig.address());
+            restServerBuilder.append(":");
+            restServerBuilder.append(restConfig.port());
+            restServerBuilder.append("/");
+
+            Client client = ClientBuilder.newBuilder().build();
+
+            WebTarget target = client.target(
+                    restServerBuilder.toString()).path(restConfig.endpoint());
+
+            targets.put(config.name(), target);
+        }
+    }
+
+    @Override
+    public void stop(String name) {
+        WebTarget target = targets.get(name);
+
+        if (target != null) {
+            target = null;
+            targets.remove(name);
+        }
+    }
+
+    @Override
+    public void restart(String name) {
+        stop(name);
+        start(name);
+    }
 }