Refactor: move telemetry config from componentCfg to configs.xml
1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes
Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
index 9b4aaf9..48cef34 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
@@ -15,8 +15,11 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.RestTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.RestTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
@@ -32,6 +35,12 @@
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
+import java.util.Map;
+import java.util.Set;
+
+import static org.onosproject.openstacktelemetry.api.Constants.REST_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.fromTelemetryConfig;
/**
* REST telemetry manager.
@@ -48,8 +57,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
- private WebTarget target = null;
- private RestTelemetryConfig restConfig = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
+ private Map<String, WebTarget> targets = Maps.newConcurrentMap();
@Activate
protected void activate() {
@@ -69,85 +80,71 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (target != null) {
- log.info("REST producer has already been started");
- return;
- }
+ public void start() {
- restConfig = (RestTelemetryConfig) config;
+ telemetryConfigService.getConfigsByType(REST).forEach(c -> {
+ RestTelemetryConfig restConfig = fromTelemetryConfig(c);
- StringBuilder restServerBuilder = new StringBuilder();
- restServerBuilder.append(PROTOCOL);
- restServerBuilder.append(":");
- restServerBuilder.append("//");
- restServerBuilder.append(restConfig.address());
- restServerBuilder.append(":");
- restServerBuilder.append(restConfig.port());
- restServerBuilder.append("/");
+ if (restConfig != null && !c.name().equals(REST_SCHEME) && c.enabled()) {
+ StringBuilder restServerBuilder = new StringBuilder();
+ restServerBuilder.append(PROTOCOL);
+ restServerBuilder.append(":");
+ restServerBuilder.append("//");
+ restServerBuilder.append(restConfig.address());
+ restServerBuilder.append(":");
+ restServerBuilder.append(restConfig.port());
+ restServerBuilder.append("/");
- Client client = ClientBuilder.newBuilder().build();
+ Client client = ClientBuilder.newBuilder().build();
- target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+ WebTarget target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+
+ targets.put(c.name(), target);
+ }
+ });
log.info("REST producer has Started");
}
@Override
public void stop() {
- if (target != null) {
- target = null;
- }
-
+ targets.values().forEach(t -> t = null);
log.info("REST producer has Stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
- public Response publish(String endpoint, String method, String record) {
- // TODO: need to find a way to invoke REST endpoint using target
- return null;
- }
+ public Set<Response> publish(String record) {
- @Override
- public Response publish(String method, String record) {
- switch (method) {
- case POST_METHOD:
- return target.request(restConfig.requestMediaType())
- .post(Entity.json(record));
- case GET_METHOD:
- return target.request(restConfig.requestMediaType()).get();
- default:
- return null;
- }
- }
+ Set<Response> responses = Sets.newConcurrentHashSet();
- @Override
- public Response publish(String record) {
+ targets.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ RestTelemetryConfig restConfig = fromTelemetryConfig(config);
- if (target == null) {
- log.debug("REST telemetry service has not been enabled!");
- return null;
- }
+ switch (restConfig.method()) {
+ case POST_METHOD:
+ responses.add(v.request(restConfig.requestMediaType())
+ .post(Entity.json(record)));
+ break;
+ case GET_METHOD:
+ responses.add(v.request(restConfig.requestMediaType()).get());
+ break;
+ default:
+ break;
+ }
+ });
- switch (restConfig.method()) {
- case POST_METHOD:
- return target.request(restConfig.requestMediaType())
- .post(Entity.json(record));
- case GET_METHOD:
- return target.request(restConfig.requestMediaType()).get();
- default:
- return null;
- }
+ return responses;
}
@Override
public boolean isRunning() {
- return target != null;
+ return !targets.isEmpty();
}
}