Improve the telemetry service LCM granularity

Change-Id: I3d1b43b360883b0644af2341bdc21aeb4f603414
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 85f36e3..152bfde 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -83,7 +83,7 @@
 
     @Deactivate
     protected void deactivate() {
-        stop();
+        stopAll();
 
         openstackTelemetryService.removeTelemetryService(this);
 
@@ -91,52 +91,6 @@
     }
 
     @Override
-    public void start() {
-        telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
-            KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
-
-            if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
-                StringBuilder kafkaServerBuilder = new StringBuilder();
-                kafkaServerBuilder.append(kafkaConfig.address());
-                kafkaServerBuilder.append(":");
-                kafkaServerBuilder.append(kafkaConfig.port());
-
-                // Configure Kafka server properties
-                Properties prop = new Properties();
-                prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
-                prop.put(RETRIES, kafkaConfig.retries());
-                prop.put(ACKS, kafkaConfig.requiredAcks());
-                prop.put(BATCH_SIZE, kafkaConfig.batchSize());
-                prop.put(LINGER_MS, kafkaConfig.lingerMs());
-                prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
-                prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
-                prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
-
-                producers.put(c.name(), new KafkaProducer<>(prop));
-            }
-        });
-
-        log.info("Kafka producer has Started");
-    }
-
-    @Override
-    public void stop() {
-        if (!producers.isEmpty()) {
-            producers.values().forEach(Producer::close);
-        }
-
-        producers.clear();
-
-        log.info("Kafka producer has Stopped");
-    }
-
-    @Override
-    public void restart() {
-        stop();
-        start();
-    }
-
-    @Override
     public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
 
         if (producers == null || producers.isEmpty()) {
@@ -170,4 +124,70 @@
     public boolean isRunning() {
         return !producers.isEmpty();
     }
+
+    @Override
+    public void start(String name) {
+        TelemetryConfig config = telemetryConfigService.getConfig(name);
+        KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+
+        if (kafkaConfig != null &&
+                !config.name().equals(KAFKA_SCHEME) && config.enabled()) {
+            StringBuilder kafkaServerBuilder = new StringBuilder();
+            kafkaServerBuilder.append(kafkaConfig.address());
+            kafkaServerBuilder.append(":");
+            kafkaServerBuilder.append(kafkaConfig.port());
+
+            // Configure Kafka server properties
+            Properties prop = new Properties();
+            prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
+            prop.put(RETRIES, kafkaConfig.retries());
+            prop.put(ACKS, kafkaConfig.requiredAcks());
+            prop.put(BATCH_SIZE, kafkaConfig.batchSize());
+            prop.put(LINGER_MS, kafkaConfig.lingerMs());
+            prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
+            prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
+            prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
+
+            producers.put(config.name(), new KafkaProducer<>(prop));
+        }
+    }
+
+    @Override
+    public void stop(String name) {
+        Producer<String, byte[]> producer = producers.get(name);
+
+        if (producer != null) {
+            producer.close();
+            producers.remove(name);
+        }
+    }
+
+    @Override
+    public void restart(String name) {
+        stop(name);
+        start(name);
+    }
+
+    @Override
+    public void startAll() {
+        telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> start(c.name()));
+        log.info("Kafka producer has Started");
+    }
+
+    @Override
+    public void stopAll() {
+        if (!producers.isEmpty()) {
+            producers.values().forEach(Producer::close);
+        }
+
+        producers.clear();
+
+        log.info("Kafka producer has Stopped");
+    }
+
+    @Override
+    public void restartAll() {
+        stopAll();
+        startAll();
+    }
 }