Improve the telemetry service LCM granularity
Change-Id: I3d1b43b360883b0644af2341bdc21aeb4f603414
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java
index 62398db..63e0c3f 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java
@@ -22,16 +22,37 @@
/**
* Prepares and launches the telemetry producer.
+ *
+ * @param name telemetry service name
*/
- void start();
+ void start(String name);
/**
* Terminates the telemetry producer.
+ *
+ * @param name telemetry service name
*/
- void stop();
+ void stop(String name);
/**
* Restarts the telemetry producer.
+ *
+ * @param name telemetry service name
*/
- void restart();
+ void restart(String name);
+
+ /**
+ * Launches all telemetry services.
+ */
+ void startAll();
+
+ /**
+ * Terminates all telemetry services.
+ */
+ void stopAll();
+
+ /**
+ * Restarts all telemetry services.
+ */
+ void restartAll();
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java
index a0c2714..3f22437 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java
@@ -21,7 +21,8 @@
/**
* Describes telemetry config event.
*/
-public class TelemetryConfigEvent extends AbstractEvent<TelemetryConfigEvent.Type, TelemetryConfig> {
+public class TelemetryConfigEvent
+ extends AbstractEvent<TelemetryConfigEvent.Type, TelemetryConfig> {
/**
* Telemetry config event type.
@@ -38,7 +39,15 @@
/**
* Signifies that an existing telemetry config is removed.
*/
- CONFIG_DELETED
+ CONFIG_DELETED,
+ /**
+ * Signifies that a telemetry service is enabled.
+ */
+ SERVICE_ENABLED,
+ /**
+ * Signifies that a telemetry service is disabled.
+ */
+ SERVICE_DISABLED
}
/**
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
index b1d6133..8fc31a6 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
@@ -51,6 +51,8 @@
import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_ADDED;
import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_DELETED;
import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_UPDATED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.SERVICE_DISABLED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.SERVICE_ENABLED;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -188,6 +190,9 @@
private void processTelemetryConfigMapUpdate(MapEvent<String,
TelemetryConfig> event) {
log.debug("Telemetry config updated");
+
+ processTelemetryServiceStatusChange(event);
+
notifyDelegate(new TelemetryConfigEvent(
CONFIG_UPDATED, event.newValue().value()));
}
@@ -198,5 +203,21 @@
notifyDelegate(new TelemetryConfigEvent(
CONFIG_DELETED, event.oldValue().value()));
}
+
+ private void processTelemetryServiceStatusChange(
+ MapEvent<String, TelemetryConfig> event) {
+ TelemetryConfig oldValue = event.oldValue().value();
+ TelemetryConfig newValue = event.newValue().value();
+
+ if (oldValue.enabled() && !newValue.enabled()) {
+ log.debug("Telemetry service {} has been disabled!", newValue.name());
+ notifyDelegate(new TelemetryConfigEvent(SERVICE_DISABLED, newValue));
+ }
+
+ if (!oldValue.enabled() && newValue.enabled()) {
+ log.debug("Telemetry service {} has been enabled!", newValue.name());
+ notifyDelegate(new TelemetryConfigEvent(SERVICE_ENABLED, newValue));
+ }
+ }
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
index f487504..92e9aef 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
@@ -15,13 +15,14 @@
*/
package org.onosproject.openstacktelemetry.impl;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.GrpcTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -30,7 +31,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Set;
+import java.util.Map;
import static org.onosproject.openstacktelemetry.api.Constants.GRPC_SCHEME;
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
@@ -50,7 +51,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected TelemetryConfigService telemetryConfigService;
- private Set<ManagedChannel> channels = Sets.newConcurrentHashSet();
+ private Map<String, ManagedChannel> channels = Maps.newConcurrentMap();
@Activate
protected void activate() {
@@ -62,7 +63,7 @@
@Deactivate
protected void deactivate() {
- stop();
+ stopAll();
openstackTelemetryService.removeTelemetryService(this);
@@ -70,34 +71,53 @@
}
@Override
- public void start() {
- telemetryConfigService.getConfigsByType(GRPC).forEach(c -> {
- GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(c);
+ public void start(String name) {
+ TelemetryConfig config = telemetryConfigService.getConfig(name);
+ GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(config);
- if (grpcConfig != null && !c.name().equals(GRPC_SCHEME) && c.enabled()) {
- ManagedChannel channel = ManagedChannelBuilder
- .forAddress(grpcConfig.address(), grpcConfig.port())
- .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
- .usePlaintext(grpcConfig.usePlaintext())
- .build();
+ if (grpcConfig != null && !config.name().equals(GRPC_SCHEME) && config.enabled()) {
+ ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(grpcConfig.address(), grpcConfig.port())
+ .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
+ .usePlaintext(grpcConfig.usePlaintext())
+ .build();
- channels.add(channel);
- }
- });
+ channels.put(name, channel);
+ }
+ }
+ @Override
+ public void stop(String name) {
+ ManagedChannel channel = channels.get(name);
+
+ if (channel != null) {
+ channel.shutdown();
+ channels.remove(name);
+ }
+ }
+
+ @Override
+ public void restart(String name) {
+ stop(name);
+ start(name);
+ }
+
+ @Override
+ public void startAll() {
+ telemetryConfigService.getConfigsByType(GRPC).forEach(c -> start(c.name()));
log.info("gRPC producer has Started");
}
@Override
- public void stop() {
- channels.forEach(ManagedChannel::shutdown);
+ public void stopAll() {
+ channels.values().forEach(ManagedChannel::shutdown);
log.info("gRPC producer has Stopped");
}
@Override
- public void restart() {
- stop();
- start();
+ public void restartAll() {
+ stopAll();
+ startAll();
}
@Override
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 781c4e2..962779a 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -93,7 +93,7 @@
@Deactivate
protected void deactivate() {
- stop();
+ stopAll();
openstackTelemetryService.removeTelemetryService(this);
@@ -101,47 +101,6 @@
}
@Override
- public void start() {
-
- telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> {
- InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(c);
-
- if (influxDbConfig != null && !c.name().equals(INFLUXDB_SCHEME) && c.enabled()) {
- StringBuilder influxDbServerBuilder = new StringBuilder();
- influxDbServerBuilder.append(INFLUX_PROTOCOL);
- influxDbServerBuilder.append(":");
- influxDbServerBuilder.append("//");
- influxDbServerBuilder.append(influxDbConfig.address());
- influxDbServerBuilder.append(":");
- influxDbServerBuilder.append(influxDbConfig.port());
-
- InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
- influxDbConfig.username(), influxDbConfig.password());
- producers.put(c.name(), producer);
-
- createDB(producer, influxDbConfig.database());
- }
- });
-
- log.info("InfluxDB producer has Started");
- }
-
- @Override
- public void stop() {
- if (producers != null) {
- producers.values().forEach(InfluxDB::close);
- }
-
- log.info("InfluxDB producer has stopped");
- }
-
- @Override
- public void restart() {
- stop();
- start();
- }
-
- @Override
public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
if (producers == null || producers.isEmpty()) {
log.debug("InfluxDB telemetry service has not been enabled!");
@@ -224,4 +183,63 @@
}
return tpPort.toString();
}
+
+ @Override
+ public void start(String name) {
+ TelemetryConfig config = telemetryConfigService.getConfig(name);
+ InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
+
+ if (influxDbConfig != null &&
+ !config.name().equals(INFLUXDB_SCHEME) && config.enabled()) {
+ StringBuilder influxDbServerBuilder = new StringBuilder();
+ influxDbServerBuilder.append(INFLUX_PROTOCOL);
+ influxDbServerBuilder.append(":");
+ influxDbServerBuilder.append("//");
+ influxDbServerBuilder.append(influxDbConfig.address());
+ influxDbServerBuilder.append(":");
+ influxDbServerBuilder.append(influxDbConfig.port());
+
+ InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
+ influxDbConfig.username(), influxDbConfig.password());
+ producers.put(config.name(), producer);
+
+ createDB(producer, influxDbConfig.database());
+ }
+ }
+
+ @Override
+ public void stop(String name) {
+ InfluxDB producer = producers.get(name);
+
+ if (producer != null) {
+ producer.close();
+ producers.remove(name);
+ }
+ }
+
+ @Override
+ public void restart(String name) {
+ stop(name);
+ start(name);
+ }
+
+ @Override
+ public void startAll() {
+ telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> start(c.name()));
+ log.info("InfluxDB producer has Started"); }
+
+ @Override
+ public void stopAll() {
+ if (producers != null) {
+ producers.values().forEach(InfluxDB::close);
+ }
+
+ log.info("InfluxDB producer has stopped");
+ }
+
+ @Override
+ public void restartAll() {
+ stopAll();
+ startAll();
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 85f36e3..152bfde 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -83,7 +83,7 @@
@Deactivate
protected void deactivate() {
- stop();
+ stopAll();
openstackTelemetryService.removeTelemetryService(this);
@@ -91,52 +91,6 @@
}
@Override
- public void start() {
- telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
- KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
-
- if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
- StringBuilder kafkaServerBuilder = new StringBuilder();
- kafkaServerBuilder.append(kafkaConfig.address());
- kafkaServerBuilder.append(":");
- kafkaServerBuilder.append(kafkaConfig.port());
-
- // Configure Kafka server properties
- Properties prop = new Properties();
- prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
- prop.put(RETRIES, kafkaConfig.retries());
- prop.put(ACKS, kafkaConfig.requiredAcks());
- prop.put(BATCH_SIZE, kafkaConfig.batchSize());
- prop.put(LINGER_MS, kafkaConfig.lingerMs());
- prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
- prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
- prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
-
- producers.put(c.name(), new KafkaProducer<>(prop));
- }
- });
-
- log.info("Kafka producer has Started");
- }
-
- @Override
- public void stop() {
- if (!producers.isEmpty()) {
- producers.values().forEach(Producer::close);
- }
-
- producers.clear();
-
- log.info("Kafka producer has Stopped");
- }
-
- @Override
- public void restart() {
- stop();
- start();
- }
-
- @Override
public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
if (producers == null || producers.isEmpty()) {
@@ -170,4 +124,70 @@
public boolean isRunning() {
return !producers.isEmpty();
}
+
+ @Override
+ public void start(String name) {
+ TelemetryConfig config = telemetryConfigService.getConfig(name);
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+
+ if (kafkaConfig != null &&
+ !config.name().equals(KAFKA_SCHEME) && config.enabled()) {
+ StringBuilder kafkaServerBuilder = new StringBuilder();
+ kafkaServerBuilder.append(kafkaConfig.address());
+ kafkaServerBuilder.append(":");
+ kafkaServerBuilder.append(kafkaConfig.port());
+
+ // Configure Kafka server properties
+ Properties prop = new Properties();
+ prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
+ prop.put(RETRIES, kafkaConfig.retries());
+ prop.put(ACKS, kafkaConfig.requiredAcks());
+ prop.put(BATCH_SIZE, kafkaConfig.batchSize());
+ prop.put(LINGER_MS, kafkaConfig.lingerMs());
+ prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
+ prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
+ prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
+
+ producers.put(config.name(), new KafkaProducer<>(prop));
+ }
+ }
+
+ @Override
+ public void stop(String name) {
+ Producer<String, byte[]> producer = producers.get(name);
+
+ if (producer != null) {
+ producer.close();
+ producers.remove(name);
+ }
+ }
+
+ @Override
+ public void restart(String name) {
+ stop(name);
+ start(name);
+ }
+
+ @Override
+ public void startAll() {
+ telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> start(c.name()));
+ log.info("Kafka producer has Started");
+ }
+
+ @Override
+ public void stopAll() {
+ if (!producers.isEmpty()) {
+ producers.values().forEach(Producer::close);
+ }
+
+ producers.clear();
+
+ log.info("Kafka producer has Stopped");
+ }
+
+ @Override
+ public void restartAll() {
+ stopAll();
+ startAll();
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index ce325cd..d49cdc0 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -154,8 +154,15 @@
TelemetryAdminService service =
telemetryService(event.subject().type().name());
- if (service != null) {
- service.restart();
+ switch (event.type()) {
+ case SERVICE_ENABLED:
+ service.start(event.subject().name());
+ break;
+ case SERVICE_DISABLED:
+ service.stop(event.subject().name());
+ break;
+ default:
+ break;
}
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
index 7111fc0..3ccc02e 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
@@ -15,7 +15,7 @@
*/
package org.onosproject.openstacktelemetry.impl;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.exporter.MetricsServlet;
@@ -28,6 +28,7 @@
import org.onosproject.openstacktelemetry.api.PrometheusTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.PrometheusTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -37,6 +38,7 @@
import org.slf4j.LoggerFactory;
import java.util.Arrays;
+import java.util.Map;
import java.util.Set;
import static org.onosproject.openstacktelemetry.api.Constants.PROMETHEUS_SCHEME;
@@ -51,7 +53,7 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private Set<Server> prometheusExporters = Sets.newConcurrentHashSet();
+ private Map<String, Server> prometheusExporters = Maps.newConcurrentMap();
private static final String FLOW_TYPE = "flowType";
private static final String DEVICE_ID = "deviceId";
@@ -149,44 +151,21 @@
@Deactivate
protected void deactivate() {
- stop();
+ stopAll();
openstackTelemetryService.removeTelemetryService(this);
log.info("Stopped");
}
@Override
- public void start() {
+ public void startAll() {
log.info("Prometheus exporter starts.");
- telemetryConfigService.getConfigsByType(PROMETHEUS).forEach(c -> {
- PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(c);
-
- if (prometheusConfig != null &&
- !c.name().equals(PROMETHEUS_SCHEME) && c.enabled()) {
- try {
- // TODO Offer a 'Authentication'
- Server prometheusExporter = new Server(prometheusConfig.port());
- ServletContextHandler context = new ServletContextHandler();
- context.setContextPath("/");
- prometheusExporter.setHandler(context);
- context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
-
- log.info("Prometheus server start");
-
- prometheusExporter.start();
-
- prometheusExporters.add(prometheusExporter);
-
- } catch (Exception ex) {
- log.warn("Exception: {}", ex);
- }
- }
- });
+ telemetryConfigService.getConfigsByType(PROMETHEUS).forEach(c -> start(c.name()));
}
@Override
- public void stop() {
- prometheusExporters.forEach(pe -> {
+ public void stopAll() {
+ prometheusExporters.values().forEach(pe -> {
try {
pe.stop();
} catch (Exception e) {
@@ -197,9 +176,9 @@
}
@Override
- public void restart() {
- stop();
- start();
+ public void restartAll() {
+ stopAll();
+ startAll();
}
@Override
@@ -269,4 +248,50 @@
public boolean isRunning() {
return !prometheusExporters.isEmpty();
}
+
+ @Override
+ public void start(String name) {
+ TelemetryConfig config = telemetryConfigService.getConfig(name);
+ PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(config);
+
+ if (prometheusConfig != null &&
+ !config.name().equals(PROMETHEUS_SCHEME) && config.enabled()) {
+ try {
+ // TODO Offer a 'Authentication'
+ Server prometheusExporter = new Server(prometheusConfig.port());
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ prometheusExporter.setHandler(context);
+ context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
+
+ log.info("Prometheus server start");
+
+ prometheusExporter.start();
+
+ prometheusExporters.put(name, prometheusExporter);
+
+ } catch (Exception ex) {
+ log.warn("Exception: {}", ex);
+ }
+ }
+ }
+
+ @Override
+ public void stop(String name) {
+ try {
+ Server pe = prometheusExporters.get(name);
+ if (pe != null) {
+ pe.stop();
+ prometheusExporters.remove(name);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to stop prometheus server due to {}", e);
+ }
+ }
+
+ @Override
+ public void restart(String name) {
+ stop(name);
+ start(name);
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
index 48cef34..f5641bd 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
@@ -72,7 +72,7 @@
@Deactivate
protected void deactivate() {
- stop();
+ stopAll();
openstackTelemetryService.removeTelemetryService(this);
@@ -80,42 +80,23 @@
}
@Override
- public void start() {
+ public void startAll() {
- telemetryConfigService.getConfigsByType(REST).forEach(c -> {
- RestTelemetryConfig restConfig = fromTelemetryConfig(c);
-
- if (restConfig != null && !c.name().equals(REST_SCHEME) && c.enabled()) {
- StringBuilder restServerBuilder = new StringBuilder();
- restServerBuilder.append(PROTOCOL);
- restServerBuilder.append(":");
- restServerBuilder.append("//");
- restServerBuilder.append(restConfig.address());
- restServerBuilder.append(":");
- restServerBuilder.append(restConfig.port());
- restServerBuilder.append("/");
-
- Client client = ClientBuilder.newBuilder().build();
-
- WebTarget target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
-
- targets.put(c.name(), target);
- }
- });
+ telemetryConfigService.getConfigsByType(REST).forEach(c -> start(c.name()));
log.info("REST producer has Started");
}
@Override
- public void stop() {
+ public void stopAll() {
targets.values().forEach(t -> t = null);
log.info("REST producer has Stopped");
}
@Override
- public void restart() {
- stop();
- start();
+ public void restartAll() {
+ stopAll();
+ startAll();
}
@Override
@@ -147,4 +128,45 @@
public boolean isRunning() {
return !targets.isEmpty();
}
+
+ @Override
+ public void start(String name) {
+ TelemetryConfig config = telemetryConfigService.getConfig(name);
+ RestTelemetryConfig restConfig = fromTelemetryConfig(config);
+
+ if (restConfig != null &&
+ !config.name().equals(REST_SCHEME) && config.enabled()) {
+ StringBuilder restServerBuilder = new StringBuilder();
+ restServerBuilder.append(PROTOCOL);
+ restServerBuilder.append(":");
+ restServerBuilder.append("//");
+ restServerBuilder.append(restConfig.address());
+ restServerBuilder.append(":");
+ restServerBuilder.append(restConfig.port());
+ restServerBuilder.append("/");
+
+ Client client = ClientBuilder.newBuilder().build();
+
+ WebTarget target = client.target(
+ restServerBuilder.toString()).path(restConfig.endpoint());
+
+ targets.put(config.name(), target);
+ }
+ }
+
+ @Override
+ public void stop(String name) {
+ WebTarget target = targets.get(name);
+
+ if (target != null) {
+ target = null;
+ targets.remove(name);
+ }
+ }
+
+ @Override
+ public void restart(String name) {
+ stop(name);
+ start(name);
+ }
}