Refactor: move telemetry config from componentCfg to configs.xml

1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes

Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 40528d4..796da19 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -24,6 +24,7 @@
 import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
 import org.onosproject.openstacktelemetry.api.InfluxRecord;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
 import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
 import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
 import org.osgi.service.component.annotations.Activate;
@@ -34,8 +35,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.Set;
 
+import static org.onosproject.openstacktelemetry.api.Constants.INFLUXDB_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.fromTelemetryConfig;
+
 /**
  * InfluxDB telemetry manager.
  */
@@ -70,10 +76,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected TelemetryConfigService telemetryConfigService;
+
     private static final String INFLUX_PROTOCOL = "http";
-    private InfluxDB producer = null;
-    private String database = null;
-    private String measurement = null;
+    private Map<String, InfluxDB> producers = null;
 
     @Activate
     protected void activate() {
@@ -93,51 +100,49 @@
     }
 
     @Override
-    public void start(TelemetryConfig config) {
-        if (producer != null) {
-            log.info("InfluxDB producer has already been started");
-            return;
-        }
+    public void start() {
 
-        InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
+        telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> {
+            InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(c);
 
-        StringBuilder influxDbServerBuilder = new StringBuilder();
-        influxDbServerBuilder.append(INFLUX_PROTOCOL);
-        influxDbServerBuilder.append(":");
-        influxDbServerBuilder.append("//");
-        influxDbServerBuilder.append(influxDbConfig.address());
-        influxDbServerBuilder.append(":");
-        influxDbServerBuilder.append(influxDbConfig.port());
+            if (influxDbConfig != null && !c.name().equals(INFLUXDB_SCHEME) && c.enabled()) {
+                StringBuilder influxDbServerBuilder = new StringBuilder();
+                influxDbServerBuilder.append(INFLUX_PROTOCOL);
+                influxDbServerBuilder.append(":");
+                influxDbServerBuilder.append("//");
+                influxDbServerBuilder.append(influxDbConfig.address());
+                influxDbServerBuilder.append(":");
+                influxDbServerBuilder.append(influxDbConfig.port());
 
-        producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
-                influxDbConfig.username(), influxDbConfig.password());
-        database = influxDbConfig.database();
-        measurement = influxDbConfig.measurement();
+                InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
+                        influxDbConfig.username(), influxDbConfig.password());
+                producers.put(c.name(), producer);
+
+                createDB(producer, influxDbConfig.database());
+            }
+        });
 
         log.info("InfluxDB producer has Started");
-
-        createDB();
     }
 
     @Override
     public void stop() {
-        if (producer != null) {
-            producer.close();
-            producer = null;
+        if (producers != null) {
+            producers.values().forEach(InfluxDB::close);
         }
 
         log.info("InfluxDB producer has stopped");
     }
 
     @Override
-    public void restart(TelemetryConfig config) {
+    public void restart() {
         stop();
-        start(config);
+        start();
     }
 
     @Override
     public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
-        if (producer == null) {
+        if (producers == null || producers.isEmpty()) {
             log.debug("InfluxDB telemetry service has not been enabled!");
             return;
         }
@@ -149,53 +154,61 @@
 
         log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
 
-        BatchPoints batchPoints = BatchPoints.database(database).build();
+        producers.forEach((k, v) -> {
+            TelemetryConfig config = telemetryConfigService.getConfig(k);
+            InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
 
-        for (FlowInfo flowInfo: record.flowInfos()) {
-            Point.Builder pointBuilder = Point
-                    .measurement((measurement == null) ? record.measurement() : measurement)
-                    .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
-                    .tag(DEVICE_ID, flowInfo.deviceId().toString())
-                    .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
-                    .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
-                    .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
-                    .tag(SRC_IP, flowInfo.srcIp().toString())
-                    .tag(DST_IP, flowInfo.dstIp().toString())
-                    .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
-                    .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
-                    .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
-                    .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
-                    .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
-                    .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
-                    .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
-                    .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
-                    .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
-                    .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
-                    .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+            String database = influxDbConfig.database();
+            String measurement = influxDbConfig.measurement();
 
-            if (flowInfo.vlanId() != null) {
-                pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+            BatchPoints batchPoints = BatchPoints.database(database).build();
+
+            for (FlowInfo flowInfo: record.flowInfos()) {
+                Point.Builder pointBuilder = Point
+                        .measurement((measurement == null) ? record.measurement() : measurement)
+                        .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
+                        .tag(DEVICE_ID, flowInfo.deviceId().toString())
+                        .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
+                        .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
+                        .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
+                        .tag(SRC_IP, flowInfo.srcIp().toString())
+                        .tag(DST_IP, flowInfo.dstIp().toString())
+                        .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
+                        .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
+                        .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
+                        .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
+                        .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
+                        .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
+                        .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
+                        .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
+                        .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
+                        .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
+                        .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+
+                if (flowInfo.vlanId() != null) {
+                    pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+                }
+
+                if (flowInfo.srcPort() != null) {
+                    pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
+                }
+
+                if (flowInfo.dstPort() != null) {
+                    pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
+                }
+
+                batchPoints.point(pointBuilder.build());
             }
-
-            if (flowInfo.srcPort() != null) {
-                pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
-            }
-
-            if (flowInfo.dstPort() != null) {
-                pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
-            }
-
-            batchPoints.point(pointBuilder.build());
-        }
-        producer.write(batchPoints);
+            v.write(batchPoints);
+        });
     }
 
     @Override
     public boolean isRunning() {
-        return producer != null;
+        return !producers.isEmpty();
     }
 
-    private void createDB() {
+    private void createDB(InfluxDB producer, String database) {
         if (producer.databaseExists(database)) {
             log.debug("Database {} is already created", database);
         } else {