Improve the telemetry service LCM granularity
Change-Id: I3d1b43b360883b0644af2341bdc21aeb4f603414
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 85f36e3..152bfde 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -83,7 +83,7 @@
@Deactivate
protected void deactivate() {
- stop();
+ stopAll();
openstackTelemetryService.removeTelemetryService(this);
@@ -91,52 +91,6 @@
}
@Override
- public void start() {
- telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
- KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
-
- if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
- StringBuilder kafkaServerBuilder = new StringBuilder();
- kafkaServerBuilder.append(kafkaConfig.address());
- kafkaServerBuilder.append(":");
- kafkaServerBuilder.append(kafkaConfig.port());
-
- // Configure Kafka server properties
- Properties prop = new Properties();
- prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
- prop.put(RETRIES, kafkaConfig.retries());
- prop.put(ACKS, kafkaConfig.requiredAcks());
- prop.put(BATCH_SIZE, kafkaConfig.batchSize());
- prop.put(LINGER_MS, kafkaConfig.lingerMs());
- prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
- prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
- prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
-
- producers.put(c.name(), new KafkaProducer<>(prop));
- }
- });
-
- log.info("Kafka producer has Started");
- }
-
- @Override
- public void stop() {
- if (!producers.isEmpty()) {
- producers.values().forEach(Producer::close);
- }
-
- producers.clear();
-
- log.info("Kafka producer has Stopped");
- }
-
- @Override
- public void restart() {
- stop();
- start();
- }
-
- @Override
public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
if (producers == null || producers.isEmpty()) {
@@ -170,4 +124,70 @@
public boolean isRunning() {
return !producers.isEmpty();
}
+
+ @Override
+ public void start(String name) {
+ TelemetryConfig config = telemetryConfigService.getConfig(name);
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+
+ if (kafkaConfig != null &&
+ !config.name().equals(KAFKA_SCHEME) && config.enabled()) {
+ StringBuilder kafkaServerBuilder = new StringBuilder();
+ kafkaServerBuilder.append(kafkaConfig.address());
+ kafkaServerBuilder.append(":");
+ kafkaServerBuilder.append(kafkaConfig.port());
+
+ // Configure Kafka server properties
+ Properties prop = new Properties();
+ prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
+ prop.put(RETRIES, kafkaConfig.retries());
+ prop.put(ACKS, kafkaConfig.requiredAcks());
+ prop.put(BATCH_SIZE, kafkaConfig.batchSize());
+ prop.put(LINGER_MS, kafkaConfig.lingerMs());
+ prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
+ prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
+ prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
+
+ producers.put(config.name(), new KafkaProducer<>(prop));
+ }
+ }
+
+ @Override
+ public void stop(String name) {
+ Producer<String, byte[]> producer = producers.get(name);
+
+ if (producer != null) {
+ producer.close();
+ producers.remove(name);
+ }
+ }
+
+ @Override
+ public void restart(String name) {
+ stop(name);
+ start(name);
+ }
+
+ @Override
+ public void startAll() {
+ telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> start(c.name()));
+ log.info("Kafka producer has Started");
+ }
+
+ @Override
+ public void stopAll() {
+ if (!producers.isEmpty()) {
+ producers.values().forEach(Producer::close);
+ }
+
+ producers.clear();
+
+ log.info("Kafka producer has Stopped");
+ }
+
+ @Override
+ public void restartAll() {
+ stopAll();
+ startAll();
+ }
}