Publish a statistics record to InfluxDB
Change-Id: I046207ab16b91c5ff65ae6df9e5929b9979907e1
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java
index 20a4afa..9621467 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java
@@ -39,6 +39,7 @@
public static final String DEFAULT_INFLUXDB_USERNAME = "onos";
public static final String DEFAULT_INFLUXDB_PASSWORD = "onos";
public static final String DEFAULT_INFLUXDB_DATABASE = "onos";
+ public static final String DEFAULT_INFLUXDB_MEASUREMENT = "sonaflow";
public static final boolean DEFAULT_INFLUXDB_ENABLE_BATCH = true;
// default configuration variables for Kafka
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java
index 1cfd831..4a1bd5c 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.openstacktelemetry.api;
+import java.util.Set;
+
/**
* Service API for publishing openstack telemetry through InfluxDB producer.
*/
@@ -25,5 +27,5 @@
*
* @param record a network metric to be published
*/
- void publish(InfluxRecord<String, Object> record);
+ void publish(InfluxRecord<String, Set<FlowInfo>> record);
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java
index 57f2285..241e6e1 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java
@@ -20,4 +20,18 @@
*/
public interface InfluxRecord<K, V> {
+ /**
+ * Gets measurement name in InfluxDB.
+ *
+ * @return measurement name
+ */
+ K measurement();
+
+ /**
+ * Gets flow information and its statistics data.
+ *
+ * @return flow information and its statistics data
+ */
+ V flowInfos();
+
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java
index de8a36f..32c150f 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java
@@ -58,6 +58,13 @@
String database();
/**
+ * Obtains InfluxDB measurement name.
+ *
+ * @return InfluxDB measurement name
+ */
+ String measurement();
+
+ /**
* Obtains InfluxDB enable batch flag.
*
* @return InfluxDB enable batch flag
@@ -108,6 +115,14 @@
*/
Builder withPassword(String password);
+ /**
+ * Sets InfluxDB measurement.
+ *
+ * @param measurement InfluxDB measurement
+ * @return builder instance
+ */
+ Builder withMeasurement(String measurement);
+
/**
* Sets InfluxDB database.
*
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java
index 461fee5..c63f2bc 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java
@@ -36,18 +36,21 @@
private final String username;
private final String password;
private final String database;
+ private final String measurement;
private final boolean enableBatch;
private final Map<String, Object> configMap;
private DefaultInfluxDbTelemetryConfig(String address, int port,
String username, String password,
- String database, boolean enableBatch,
+ String database, String measurement,
+ boolean enableBatch,
Map<String, Object> configMap) {
this.address = address;
this.port = port;
this.username = username;
this.password = password;
this.database = database;
+ this.measurement = measurement;
this.enableBatch = enableBatch;
this.configMap = configMap;
}
@@ -78,6 +81,11 @@
}
@Override
+ public String measurement() {
+ return measurement;
+ }
+
+ @Override
public boolean enableBatch() {
return enableBatch;
}
@@ -104,6 +112,7 @@
Objects.equals(this.username, other.username) &&
Objects.equals(this.password, other.password) &&
Objects.equals(this.database, other.database) &&
+ Objects.equals(this.measurement, other.measurement) &&
Objects.equals(this.enableBatch, other.enableBatch) &&
Objects.equals(this.configMap, other.configMap);
}
@@ -112,7 +121,8 @@
@Override
public int hashCode() {
- return Objects.hash(address, port, username, password, database, enableBatch, configMap);
+ return Objects.hash(address, port, username, password, database,
+ measurement, enableBatch, configMap);
}
@Override
@@ -123,6 +133,7 @@
.add("username", username)
.add("password", password)
.add("database", database)
+ .add("measurement", measurement)
.add("enableBatch", enableBatch)
.add("configMap", configMap)
.toString();
@@ -143,6 +154,7 @@
private String username;
private String password;
private String database;
+ private String measurement;
private boolean enableBatch;
private Map<String, Object> configMap;
@@ -177,6 +189,12 @@
}
@Override
+ public Builder withMeasurement(String measurement) {
+ this.measurement = measurement;
+ return this;
+ }
+
+ @Override
public Builder withEnableBatch(boolean enableBatch) {
this.enableBatch = enableBatch;
return this;
@@ -194,9 +212,10 @@
checkNotNull(username, "InfluxDB server username cannot be null");
checkNotNull(password, "InfluxDB server password cannot be null");
checkNotNull(database, "InfluxDB server database cannot be null");
+ checkNotNull(measurement, "InfluxDB server measurement cannot be null");
return new DefaultInfluxDbTelemetryConfig(address, port, username,
- password, database, enableBatch, configMap);
+ password, database, measurement, enableBatch, configMap);
}
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java
new file mode 100644
index 0000000..8c45a60
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.onosproject.openstacktelemetry.api.InfluxRecord;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
+
+
+public final class DefaultInfluxRecord<K, V> implements InfluxRecord<K, V> {
+ public static final String MEASUREMENT_NAME = DEFAULT_INFLUXDB_MEASUREMENT;
+ private final K measurement;
+ private final V flowInfos;
+
+ protected DefaultInfluxRecord(K measurement, V flowInfos) {
+ if ((measurement == null) || (measurement.equals(""))) {
+ this.measurement = (K) MEASUREMENT_NAME;
+ } else {
+ this.measurement = measurement;
+ }
+ this.flowInfos = flowInfos;
+ }
+
+ @Override
+ public K measurement() {
+ return measurement;
+ }
+
+ @Override
+ public V flowInfos() {
+ return flowInfos;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj instanceof DefaultInfluxRecord) {
+ final DefaultInfluxRecord other = (DefaultInfluxRecord) obj;
+ return Objects.equals(this.measurement, other.measurement) &&
+ Objects.equals(this.flowInfos, other.flowInfos);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(measurement, flowInfos);
+ }
+
+ public String toString() {
+ return toStringHelper(this)
+ .add("measurement", measurement)
+ .add("flowInfos", flowInfos)
+ .toString();
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
index 6328e64..83c7519 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
@@ -26,7 +26,7 @@
private final IpPrefix srcIpPrefix;
private final IpPrefix dstIpPrefix;
private final byte ipProtocol;
- private final TpPort srcTpPort;
+ private final TpPort srcTpPort;
private final TpPort dstTpPort;
private static final String NOT_NULL_MSG = "Element % cannot be null";
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
index 069c374..4ce3e52 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
@@ -38,6 +38,7 @@
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DISABLE;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_DATABASE;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_ENABLE_BATCH;
+import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_PASSWORD;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_SERVER_IP;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_SERVER_PORT;
@@ -59,6 +60,7 @@
private static final String USERNAME = "username";
private static final String PASSWORD = "password";
private static final String DATABASE = "database";
+ private static final String MEASUREMENT = "measurement";
private static final String ENABLE_BATCH = "enableBatch";
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -87,6 +89,10 @@
label = "Database of InfluxDB server")
protected String database = DEFAULT_INFLUXDB_DATABASE;
+ @Property(name = MEASUREMENT, value = DEFAULT_INFLUXDB_MEASUREMENT,
+ label = "Measurement of InfluxDB server")
+ protected String measurement = DEFAULT_INFLUXDB_MEASUREMENT;
+
@Property(name = ENABLE_BATCH, boolValue = DEFAULT_INFLUXDB_ENABLE_BATCH,
label = "Flag value of enabling batch mode of InfluxDB server")
protected Boolean enableBatch = DEFAULT_INFLUXDB_ENABLE_BATCH;
@@ -141,6 +147,7 @@
.withUsername(username)
.withPassword(password)
.withDatabase(database)
+ .withMeasurement(measurement)
.withEnableBatch(enableBatch)
.build();
}
@@ -178,6 +185,10 @@
database = databaseStr != null ? databaseStr : DEFAULT_INFLUXDB_DATABASE;
log.info("Configured. InfluxDB server database is {}", database);
+ String measurementStr = Tools.get(properties, MEASUREMENT);
+ measurement = measurementStr != null ? measurementStr : DEFAULT_INFLUXDB_MEASUREMENT;
+ log.info("Configured. InfluxDB server measurement is {}", measurement);
+
Boolean enableBatchConfigured = getBooleanProperty(properties, ENABLE_BATCH);
if (enableBatchConfigured == null) {
enableBatch = DEFAULT_INFLUXDB_ENABLE_BATCH;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 7e8a102..19acaaa 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -23,6 +23,10 @@
import org.apache.felix.scr.annotations.Service;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.onlab.packet.TpPort;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.InfluxRecord;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
@@ -31,6 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
+
/**
* InfluxDB telemetry manager.
*/
@@ -40,11 +46,38 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String FLOW_TYPE = "flowType";
+ private static final String DEVICE_ID = "deviceId";
+ private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
+ private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
+
+ private static final String VLAN_ID = "vlanId";
+ private static final String VXLAN_ID = "vxlanId";
+ private static final String SRC_IP = "srcIp";
+ private static final String DST_IP = "dstIp";
+ private static final String SRC_PORT = "srcPort";
+ private static final String DST_PORT = "dstPort";
+ private static final String PROTOCOL = "protocol";
+ private static final String SRC_MAC = "srcMac";
+ private static final String DST_MAC = "dstMac";
+
+ private static final String STARTUP_TIME = "startupTime";
+ private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
+ private static final String LST_PKT_OFFSET = "lstPktOffset";
+ private static final String PREV_ACC_BYTES = "prevAccBytes";
+ private static final String PREV_ACC_PKTS = "prevAccPkts";
+ private static final String CURR_ACC_BYTES = "currAccBytes";
+ private static final String CURR_ACC_PKTS = "currAccPkts";
+ private static final String ERROR_PKTS = "errorPkts";
+ private static final String DROP_PKTS = "dropPkts";
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackTelemetryService openstackTelemetryService;
- private static final String PROTOCOL = "http";
+ private static final String INFLUX_PROTOCOL = "http";
private InfluxDB producer = null;
+ private String database = null;
+ private String measurement = null;
@Activate
protected void activate() {
@@ -73,7 +106,7 @@
InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
StringBuilder influxDbServerBuilder = new StringBuilder();
- influxDbServerBuilder.append(PROTOCOL);
+ influxDbServerBuilder.append(INFLUX_PROTOCOL);
influxDbServerBuilder.append(":");
influxDbServerBuilder.append("//");
influxDbServerBuilder.append(influxDbConfig.address());
@@ -82,12 +115,18 @@
producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
influxDbConfig.username(), influxDbConfig.password());
+ database = influxDbConfig.database();
+ measurement = influxDbConfig.measurement();
+
log.info("InfluxDB producer has Started");
+
+ createDB();
}
@Override
public void stop() {
if (producer != null) {
+ producer.close();
producer = null;
}
@@ -101,16 +140,70 @@
}
@Override
- public void publish(InfluxRecord<String, Object> record) {
- // TODO: need to find a way to invoke InfluxDB endpoint using producer
-
+ public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
if (producer == null) {
log.warn("InfluxDB telemetry service has not been enabled!");
+ return;
}
+
+ if (record.flowInfos().size() == 0) {
+ log.warn("No record to publish");
+ return;
+ }
+
+ log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
+
+ BatchPoints batchPoints = BatchPoints.database(database).build();
+
+ for (FlowInfo flowInfo: record.flowInfos()) {
+ Point point = Point
+ .measurement((measurement == null) ? record.measurement() : measurement)
+ .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
+ .tag(DEVICE_ID, flowInfo.deviceId().toString())
+ .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
+ .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
+ .tag(VLAN_ID, flowInfo.vlanId().toString())
+ .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
+ .tag(SRC_IP, flowInfo.srcIp().toString())
+ .tag(DST_IP, flowInfo.dstIp().toString())
+ .tag(SRC_PORT, getTpPort(flowInfo.srcPort()))
+ .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
+ .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
+ .tag(SRC_MAC, flowInfo.srcMac().toString())
+ .tag(DST_MAC, flowInfo.dstMac().toString())
+ .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
+ .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
+ .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
+ .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
+ .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
+ .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
+ .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
+ .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
+ .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts())
+ .build();
+ batchPoints.point(point);
+ }
+ producer.write(batchPoints);
}
@Override
public boolean isRunning() {
return producer != null;
}
+
+ private void createDB() {
+ if (producer.databaseExists(database)) {
+ log.debug("Database {} is already created", database);
+ } else {
+ producer.createDatabase(database);
+ log.debug("Database {} is created", database);
+ }
+ }
+
+ private String getTpPort(TpPort tpPort) {
+ if (tpPort == null) {
+ return "";
+ }
+ return tpPort.toString();
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index f3f386f..e19a579 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -36,6 +36,7 @@
import java.util.List;
import java.util.Set;
+import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY;
import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC;
@@ -73,7 +74,6 @@
@Override
public void publish(Set<FlowInfo> flowInfos) {
telemetryServices.forEach(service -> {
-
if (service instanceof GrpcTelemetryManager) {
invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
}
@@ -99,7 +99,9 @@
}
private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
- // TODO: need provide implementation
+ DefaultInfluxRecord<String, Set<FlowInfo>> influxRecord
+ = new DefaultInfluxRecord<>(DEFAULT_INFLUXDB_MEASUREMENT, flowInfos);
+ service.publish(influxRecord);
}
private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java
index 3c95f6d..4af5b4a 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java
@@ -40,6 +40,9 @@
private static final String DATABASE_1 = "database1";
private static final String DATABASE_2 = "database2";
+ private static final String MEASUREMENT_1 = "measurement1";
+ private static final String MEASUREMENT_2 = "measurement2";
+
private static final String USERNAME_1 = "username1";
private static final String USERNAME_2 = "username2";
@@ -72,6 +75,7 @@
.withAddress(IP_ADDRESS_1)
.withPort(PORT_1)
.withDatabase(DATABASE_1)
+ .withMeasurement(MEASUREMENT_1)
.withUsername(USERNAME_1)
.withPassword(PASSWORD_1)
.withEnableBatch(ENABLE_BATCH_1)
@@ -82,6 +86,7 @@
.withAddress(IP_ADDRESS_1)
.withPort(PORT_1)
.withDatabase(DATABASE_1)
+ .withMeasurement(MEASUREMENT_1)
.withUsername(USERNAME_1)
.withPassword(PASSWORD_1)
.withEnableBatch(ENABLE_BATCH_1)
@@ -92,6 +97,7 @@
.withAddress(IP_ADDRESS_2)
.withPort(PORT_2)
.withDatabase(DATABASE_2)
+ .withMeasurement(MEASUREMENT_2)
.withUsername(USERNAME_2)
.withPassword(PASSWORD_2)
.withEnableBatch(ENABLE_BATCH_2)
@@ -113,6 +119,7 @@
assertThat(config.address(), is(IP_ADDRESS_1));
assertThat(config.port(), is(PORT_1));
assertThat(config.database(), is(DATABASE_1));
+ assertThat(config.measurement(), is(MEASUREMENT_1));
assertThat(config.username(), is(USERNAME_1));
assertThat(config.password(), is(PASSWORD_1));
assertThat(config.enableBatch(), is(ENABLE_BATCH_1));
diff --git a/apps/openstacktelemetry/pom.xml b/apps/openstacktelemetry/pom.xml
index 0bf8590..4d40139 100644
--- a/apps/openstacktelemetry/pom.xml
+++ b/apps/openstacktelemetry/pom.xml
@@ -31,7 +31,7 @@
<description>SONA Openstack Telemetry Application</description>
<properties>
- <influxdb-java.version>2.2</influxdb-java.version>
+ <influxdb-java.version>2.9</influxdb-java.version>
<kafka-client.version>0.8.2.2_1</kafka-client.version>
<protobuf-java.version>3.2.0</protobuf-java.version>
<io-grpc.version>1.3.1</io-grpc.version>