Refactor: move telemetry config from componentCfg to configs.xml
1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes
Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 40528d4..796da19 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -24,6 +24,7 @@
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.InfluxRecord;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
@@ -34,8 +35,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.Set;
+import static org.onosproject.openstacktelemetry.api.Constants.INFLUXDB_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.fromTelemetryConfig;
+
/**
* InfluxDB telemetry manager.
*/
@@ -70,10 +76,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
private static final String INFLUX_PROTOCOL = "http";
- private InfluxDB producer = null;
- private String database = null;
- private String measurement = null;
+ private Map<String, InfluxDB> producers = null;
@Activate
protected void activate() {
@@ -93,51 +100,49 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (producer != null) {
- log.info("InfluxDB producer has already been started");
- return;
- }
+ public void start() {
- InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
+ telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> {
+ InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(c);
- StringBuilder influxDbServerBuilder = new StringBuilder();
- influxDbServerBuilder.append(INFLUX_PROTOCOL);
- influxDbServerBuilder.append(":");
- influxDbServerBuilder.append("//");
- influxDbServerBuilder.append(influxDbConfig.address());
- influxDbServerBuilder.append(":");
- influxDbServerBuilder.append(influxDbConfig.port());
+ if (influxDbConfig != null && !c.name().equals(INFLUXDB_SCHEME) && c.enabled()) {
+ StringBuilder influxDbServerBuilder = new StringBuilder();
+ influxDbServerBuilder.append(INFLUX_PROTOCOL);
+ influxDbServerBuilder.append(":");
+ influxDbServerBuilder.append("//");
+ influxDbServerBuilder.append(influxDbConfig.address());
+ influxDbServerBuilder.append(":");
+ influxDbServerBuilder.append(influxDbConfig.port());
- producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
- influxDbConfig.username(), influxDbConfig.password());
- database = influxDbConfig.database();
- measurement = influxDbConfig.measurement();
+ InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
+ influxDbConfig.username(), influxDbConfig.password());
+ producers.put(c.name(), producer);
+
+ createDB(producer, influxDbConfig.database());
+ }
+ });
log.info("InfluxDB producer has Started");
-
- createDB();
}
@Override
public void stop() {
- if (producer != null) {
- producer.close();
- producer = null;
+ if (producers != null) {
+ producers.values().forEach(InfluxDB::close);
}
log.info("InfluxDB producer has stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
- if (producer == null) {
+ if (producers == null || producers.isEmpty()) {
log.debug("InfluxDB telemetry service has not been enabled!");
return;
}
@@ -149,53 +154,61 @@
log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
- BatchPoints batchPoints = BatchPoints.database(database).build();
+ producers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
- for (FlowInfo flowInfo: record.flowInfos()) {
- Point.Builder pointBuilder = Point
- .measurement((measurement == null) ? record.measurement() : measurement)
- .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
- .tag(DEVICE_ID, flowInfo.deviceId().toString())
- .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
- .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
- .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
- .tag(SRC_IP, flowInfo.srcIp().toString())
- .tag(DST_IP, flowInfo.dstIp().toString())
- .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
- .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
- .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
- .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
- .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
- .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
- .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
- .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
- .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
- .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
- .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+ String database = influxDbConfig.database();
+ String measurement = influxDbConfig.measurement();
- if (flowInfo.vlanId() != null) {
- pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+ BatchPoints batchPoints = BatchPoints.database(database).build();
+
+ for (FlowInfo flowInfo: record.flowInfos()) {
+ Point.Builder pointBuilder = Point
+ .measurement((measurement == null) ? record.measurement() : measurement)
+ .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
+ .tag(DEVICE_ID, flowInfo.deviceId().toString())
+ .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
+ .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
+ .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
+ .tag(SRC_IP, flowInfo.srcIp().toString())
+ .tag(DST_IP, flowInfo.dstIp().toString())
+ .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
+ .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
+ .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
+ .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
+ .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
+ .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
+ .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
+ .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
+ .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
+ .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
+ .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+
+ if (flowInfo.vlanId() != null) {
+ pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+ }
+
+ if (flowInfo.srcPort() != null) {
+ pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
+ }
+
+ if (flowInfo.dstPort() != null) {
+ pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
+ }
+
+ batchPoints.point(pointBuilder.build());
}
-
- if (flowInfo.srcPort() != null) {
- pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
- }
-
- if (flowInfo.dstPort() != null) {
- pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
- }
-
- batchPoints.point(pointBuilder.build());
- }
- producer.write(batchPoints);
+ v.write(batchPoints);
+ });
}
@Override
public boolean isRunning() {
- return producer != null;
+ return !producers.isEmpty();
}
- private void createDB() {
+ private void createDB(InfluxDB producer, String database) {
if (producer.databaseExists(database)) {
log.debug("Database {} is already created", database);
} else {