Refactor: move telemetry config from componentCfg to configs.xml
1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes
Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
index 391b1d8..f487504 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
@@ -15,12 +15,13 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Sets;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.GrpcTelemetryConfig;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -29,6 +30,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
+
+import static org.onosproject.openstacktelemetry.api.Constants.GRPC_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
+import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.fromTelemetryConfig;
+
/**
* gRPC telemetry manager.
*/
@@ -40,7 +47,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
- private ManagedChannel channel = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
+ private Set<ManagedChannel> channels = Sets.newConcurrentHashSet();
@Activate
protected void activate() {
@@ -60,43 +70,41 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (channel != null) {
- log.info("gRPC producer has already been started");
- return;
- }
+ public void start() {
+ telemetryConfigService.getConfigsByType(GRPC).forEach(c -> {
+ GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(c);
- GrpcTelemetryConfig grpcConfig = (GrpcTelemetryConfig) config;
- channel = ManagedChannelBuilder
- .forAddress(grpcConfig.address(), grpcConfig.port())
- .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
- .usePlaintext(grpcConfig.usePlaintext())
- .build();
+ if (grpcConfig != null && !c.name().equals(GRPC_SCHEME) && c.enabled()) {
+ ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(grpcConfig.address(), grpcConfig.port())
+ .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
+ .usePlaintext(grpcConfig.usePlaintext())
+ .build();
+
+ channels.add(channel);
+ }
+ });
log.info("gRPC producer has Started");
}
@Override
public void stop() {
- if (channel != null) {
- channel.shutdown();
- channel = null;
- }
-
+ channels.forEach(ManagedChannel::shutdown);
log.info("gRPC producer has Stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
public Object publish(Object record) {
// TODO: need to find a way to invoke gRPC endpoint using channel
- if (channel == null) {
+ if (channels.isEmpty()) {
log.debug("gRPC telemetry service has not been enabled!");
}
@@ -105,6 +113,6 @@
@Override
public boolean isRunning() {
- return channel != null;
+ return !channels.isEmpty();
}
}