Publish a statistics record to InfluxDB

Change-Id: I046207ab16b91c5ff65ae6df9e5929b9979907e1
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java
index 20a4afa..9621467 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java
@@ -39,6 +39,7 @@
     public static final String DEFAULT_INFLUXDB_USERNAME = "onos";
     public static final String DEFAULT_INFLUXDB_PASSWORD = "onos";
     public static final String DEFAULT_INFLUXDB_DATABASE = "onos";
+    public static final String DEFAULT_INFLUXDB_MEASUREMENT = "sonaflow";
     public static final boolean DEFAULT_INFLUXDB_ENABLE_BATCH = true;
 
     // default configuration variables for Kafka
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java
index 1cfd831..4a1bd5c 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.openstacktelemetry.api;
 
+import java.util.Set;
+
 /**
  * Service API for publishing openstack telemetry through InfluxDB producer.
  */
@@ -25,5 +27,5 @@
      *
      * @param record a network metric to be published
      */
-    void publish(InfluxRecord<String, Object> record);
+    void publish(InfluxRecord<String, Set<FlowInfo>> record);
 }
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java
index 57f2285..241e6e1 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxRecord.java
@@ -20,4 +20,18 @@
  */
 public interface InfluxRecord<K, V> {
 
+    /**
+     * Gets measurement name in InfluxDB.
+     *
+     * @return measurement name
+     */
+    K measurement();
+
+    /**
+     * Gets flow information and its statistics data.
+     *
+     * @return flow information and its statistics data
+     */
+    V flowInfos();
+
 }
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java
index de8a36f..32c150f 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java
@@ -58,6 +58,13 @@
     String database();
 
     /**
+     * Obtains InfluxDB measurement name.
+     *
+     * @return InfluxDB measurement name
+     */
+    String measurement();
+
+    /**
      * Obtains InfluxDB enable batch flag.
      *
      * @return InfluxDB enable batch flag
@@ -108,6 +115,14 @@
          */
         Builder withPassword(String password);
 
+         /**
+         * Sets InfluxDB measurement.
+         *
+         * @param measurement InfluxDB measurement
+         * @return builder instance
+         */
+        Builder withMeasurement(String measurement);
+
         /**
          * Sets InfluxDB database.
          *
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java
index 461fee5..c63f2bc 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java
@@ -36,18 +36,21 @@
     private final String username;
     private final String password;
     private final String database;
+    private final String measurement;
     private final boolean enableBatch;
     private final Map<String, Object> configMap;
 
     private DefaultInfluxDbTelemetryConfig(String address, int port,
                                            String username, String password,
-                                           String database, boolean enableBatch,
+                                           String database, String measurement,
+                                           boolean enableBatch,
                                            Map<String, Object> configMap) {
         this.address = address;
         this.port = port;
         this.username = username;
         this.password = password;
         this.database = database;
+        this.measurement = measurement;
         this.enableBatch = enableBatch;
         this.configMap = configMap;
     }
@@ -78,6 +81,11 @@
     }
 
     @Override
+    public String measurement() {
+        return measurement;
+    }
+
+    @Override
     public boolean enableBatch() {
         return enableBatch;
     }
@@ -104,6 +112,7 @@
                     Objects.equals(this.username, other.username) &&
                     Objects.equals(this.password, other.password) &&
                     Objects.equals(this.database, other.database) &&
+                    Objects.equals(this.measurement, other.measurement) &&
                     Objects.equals(this.enableBatch, other.enableBatch) &&
                     Objects.equals(this.configMap, other.configMap);
         }
@@ -112,7 +121,8 @@
 
     @Override
     public int hashCode() {
-        return Objects.hash(address, port, username, password, database, enableBatch, configMap);
+        return Objects.hash(address, port, username, password, database,
+                            measurement, enableBatch, configMap);
     }
 
     @Override
@@ -123,6 +133,7 @@
                 .add("username", username)
                 .add("password", password)
                 .add("database", database)
+                .add("measurement", measurement)
                 .add("enableBatch", enableBatch)
                 .add("configMap", configMap)
                 .toString();
@@ -143,6 +154,7 @@
         private String username;
         private String password;
         private String database;
+        private String measurement;
         private boolean enableBatch;
         private Map<String, Object> configMap;
 
@@ -177,6 +189,12 @@
         }
 
         @Override
+        public Builder withMeasurement(String measurement) {
+            this.measurement = measurement;
+            return this;
+        }
+
+        @Override
         public Builder withEnableBatch(boolean enableBatch) {
             this.enableBatch = enableBatch;
             return this;
@@ -194,9 +212,10 @@
             checkNotNull(username, "InfluxDB server username cannot be null");
             checkNotNull(password, "InfluxDB server password cannot be null");
             checkNotNull(database, "InfluxDB server database cannot be null");
+            checkNotNull(measurement, "InfluxDB server measurement cannot be null");
 
             return new DefaultInfluxDbTelemetryConfig(address, port, username,
-                    password, database, enableBatch, configMap);
+                    password, database, measurement, enableBatch, configMap);
         }
     }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java
new file mode 100644
index 0000000..8c45a60
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultInfluxRecord.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.onosproject.openstacktelemetry.api.InfluxRecord;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
+
+
+public final class DefaultInfluxRecord<K, V> implements InfluxRecord<K, V> {
+    public static final String MEASUREMENT_NAME = DEFAULT_INFLUXDB_MEASUREMENT;
+    private final K measurement;
+    private final V flowInfos;
+
+    protected DefaultInfluxRecord(K measurement, V flowInfos) {
+        if ((measurement == null) || (measurement.equals(""))) {
+            this.measurement = (K) MEASUREMENT_NAME;
+        } else {
+            this.measurement = measurement;
+        }
+        this.flowInfos = flowInfos;
+    }
+
+    @Override
+    public K measurement() {
+        return measurement;
+    }
+
+    @Override
+    public V flowInfos() {
+        return flowInfos;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (obj instanceof  DefaultInfluxRecord) {
+            final DefaultInfluxRecord other = (DefaultInfluxRecord) obj;
+            return Objects.equals(this.measurement, other.measurement) &&
+                    Objects.equals(this.flowInfos, other.flowInfos);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(measurement, flowInfos);
+    }
+
+    public String toString() {
+        return toStringHelper(this)
+                .add("measurement", measurement)
+                .add("flowInfos", flowInfos)
+                .toString();
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
index 6328e64..83c7519 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
@@ -26,7 +26,7 @@
     private final IpPrefix srcIpPrefix;
     private final IpPrefix dstIpPrefix;
     private final byte     ipProtocol;
-    private final TpPort srcTpPort;
+    private final TpPort   srcTpPort;
     private final TpPort   dstTpPort;
 
     private static final String NOT_NULL_MSG = "Element % cannot be null";
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
index 069c374..4ce3e52 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
@@ -38,6 +38,7 @@
 import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DISABLE;
 import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_DATABASE;
 import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_ENABLE_BATCH;
+import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
 import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_PASSWORD;
 import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_SERVER_IP;
 import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_SERVER_PORT;
@@ -59,6 +60,7 @@
     private static final String USERNAME = "username";
     private static final String PASSWORD = "password";
     private static final String DATABASE = "database";
+    private static final String MEASUREMENT = "measurement";
     private static final String ENABLE_BATCH = "enableBatch";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -87,6 +89,10 @@
             label = "Database of InfluxDB server")
     protected String database = DEFAULT_INFLUXDB_DATABASE;
 
+    @Property(name = MEASUREMENT, value = DEFAULT_INFLUXDB_MEASUREMENT,
+            label = "Measurement of InfluxDB server")
+    protected String measurement = DEFAULT_INFLUXDB_MEASUREMENT;
+
     @Property(name = ENABLE_BATCH, boolValue = DEFAULT_INFLUXDB_ENABLE_BATCH,
             label = "Flag value of enabling batch mode of InfluxDB server")
     protected Boolean enableBatch = DEFAULT_INFLUXDB_ENABLE_BATCH;
@@ -141,6 +147,7 @@
                 .withUsername(username)
                 .withPassword(password)
                 .withDatabase(database)
+                .withMeasurement(measurement)
                 .withEnableBatch(enableBatch)
                 .build();
     }
@@ -178,6 +185,10 @@
         database = databaseStr != null ? databaseStr : DEFAULT_INFLUXDB_DATABASE;
         log.info("Configured. InfluxDB server database is {}", database);
 
+        String measurementStr = Tools.get(properties, MEASUREMENT);
+        measurement = measurementStr != null ? measurementStr : DEFAULT_INFLUXDB_MEASUREMENT;
+        log.info("Configured. InfluxDB server measurement is {}", measurement);
+
         Boolean enableBatchConfigured = getBooleanProperty(properties, ENABLE_BATCH);
         if (enableBatchConfigured == null) {
             enableBatch = DEFAULT_INFLUXDB_ENABLE_BATCH;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 7e8a102..19acaaa 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -23,6 +23,10 @@
 import org.apache.felix.scr.annotations.Service;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.onlab.packet.TpPort;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
 import org.onosproject.openstacktelemetry.api.InfluxRecord;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
@@ -31,6 +35,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Set;
+
 /**
  * InfluxDB telemetry manager.
  */
@@ -40,11 +46,38 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String FLOW_TYPE = "flowType";
+    private static final String DEVICE_ID = "deviceId";
+    private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
+    private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
+
+    private static final String VLAN_ID = "vlanId";
+    private static final String VXLAN_ID = "vxlanId";
+    private static final String SRC_IP = "srcIp";
+    private static final String DST_IP = "dstIp";
+    private static final String SRC_PORT = "srcPort";
+    private static final String DST_PORT = "dstPort";
+    private static final String PROTOCOL = "protocol";
+    private static final String SRC_MAC = "srcMac";
+    private static final String DST_MAC = "dstMac";
+
+    private static final String STARTUP_TIME = "startupTime";
+    private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
+    private static final String LST_PKT_OFFSET = "lstPktOffset";
+    private static final String PREV_ACC_BYTES = "prevAccBytes";
+    private static final String PREV_ACC_PKTS = "prevAccPkts";
+    private static final String CURR_ACC_BYTES = "currAccBytes";
+    private static final String CURR_ACC_PKTS = "currAccPkts";
+    private static final String ERROR_PKTS = "errorPkts";
+    private static final String DROP_PKTS = "dropPkts";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
-    private static final String PROTOCOL = "http";
+    private static final String INFLUX_PROTOCOL = "http";
     private InfluxDB producer = null;
+    private String database = null;
+    private String measurement = null;
 
     @Activate
     protected void activate() {
@@ -73,7 +106,7 @@
         InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
 
         StringBuilder influxDbServerBuilder = new StringBuilder();
-        influxDbServerBuilder.append(PROTOCOL);
+        influxDbServerBuilder.append(INFLUX_PROTOCOL);
         influxDbServerBuilder.append(":");
         influxDbServerBuilder.append("//");
         influxDbServerBuilder.append(influxDbConfig.address());
@@ -82,12 +115,18 @@
 
         producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
                 influxDbConfig.username(), influxDbConfig.password());
+        database = influxDbConfig.database();
+        measurement = influxDbConfig.measurement();
+
         log.info("InfluxDB producer has Started");
+
+        createDB();
     }
 
     @Override
     public void stop() {
         if (producer != null) {
+            producer.close();
             producer = null;
         }
 
@@ -101,16 +140,70 @@
     }
 
     @Override
-    public void publish(InfluxRecord<String, Object> record) {
-        // TODO: need to find a way to invoke InfluxDB endpoint using producer
-
+    public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
         if (producer == null) {
             log.warn("InfluxDB telemetry service has not been enabled!");
+            return;
         }
+
+        if (record.flowInfos().size() == 0) {
+            log.warn("No record to publish");
+            return;
+        }
+
+        log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
+
+        BatchPoints batchPoints = BatchPoints.database(database).build();
+
+        for (FlowInfo flowInfo: record.flowInfos()) {
+            Point point = Point
+                    .measurement((measurement == null) ? record.measurement() : measurement)
+                    .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
+                    .tag(DEVICE_ID, flowInfo.deviceId().toString())
+                    .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
+                    .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
+                    .tag(VLAN_ID, flowInfo.vlanId().toString())
+                    .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
+                    .tag(SRC_IP, flowInfo.srcIp().toString())
+                    .tag(DST_IP, flowInfo.dstIp().toString())
+                    .tag(SRC_PORT, getTpPort(flowInfo.srcPort()))
+                    .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
+                    .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
+                    .tag(SRC_MAC, flowInfo.srcMac().toString())
+                    .tag(DST_MAC, flowInfo.dstMac().toString())
+                    .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
+                    .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
+                    .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
+                    .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
+                    .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
+                    .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
+                    .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
+                    .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
+                    .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts())
+                    .build();
+            batchPoints.point(point);
+        }
+        producer.write(batchPoints);
     }
 
     @Override
     public boolean isRunning() {
         return producer != null;
     }
+
+    private void createDB() {
+        if (producer.databaseExists(database)) {
+            log.debug("Database {} is already created", database);
+        } else {
+            producer.createDatabase(database);
+            log.debug("Database {} is created", database);
+        }
+    }
+
+    private String getTpPort(TpPort tpPort) {
+        if (tpPort == null) {
+            return "";
+        }
+        return tpPort.toString();
+    }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index f3f386f..e19a579 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -36,6 +36,7 @@
 import java.util.List;
 import java.util.Set;
 
+import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
 import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY;
 import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC;
 
@@ -73,7 +74,6 @@
     @Override
     public void publish(Set<FlowInfo> flowInfos) {
         telemetryServices.forEach(service -> {
-
             if (service instanceof GrpcTelemetryManager) {
                 invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
             }
@@ -99,7 +99,9 @@
     }
 
     private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
-        // TODO: need provide implementation
+        DefaultInfluxRecord<String, Set<FlowInfo>> influxRecord
+                = new DefaultInfluxRecord<>(DEFAULT_INFLUXDB_MEASUREMENT, flowInfos);
+        service.publish(influxRecord);
     }
 
     private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java
index 3c95f6d..4af5b4a 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java
@@ -40,6 +40,9 @@
     private static final String DATABASE_1 = "database1";
     private static final String DATABASE_2 = "database2";
 
+    private static final String MEASUREMENT_1 = "measurement1";
+    private static final String MEASUREMENT_2 = "measurement2";
+
     private static final String USERNAME_1 = "username1";
     private static final String USERNAME_2 = "username2";
 
@@ -72,6 +75,7 @@
                 .withAddress(IP_ADDRESS_1)
                 .withPort(PORT_1)
                 .withDatabase(DATABASE_1)
+                .withMeasurement(MEASUREMENT_1)
                 .withUsername(USERNAME_1)
                 .withPassword(PASSWORD_1)
                 .withEnableBatch(ENABLE_BATCH_1)
@@ -82,6 +86,7 @@
                 .withAddress(IP_ADDRESS_1)
                 .withPort(PORT_1)
                 .withDatabase(DATABASE_1)
+                .withMeasurement(MEASUREMENT_1)
                 .withUsername(USERNAME_1)
                 .withPassword(PASSWORD_1)
                 .withEnableBatch(ENABLE_BATCH_1)
@@ -92,6 +97,7 @@
                 .withAddress(IP_ADDRESS_2)
                 .withPort(PORT_2)
                 .withDatabase(DATABASE_2)
+                .withMeasurement(MEASUREMENT_2)
                 .withUsername(USERNAME_2)
                 .withPassword(PASSWORD_2)
                 .withEnableBatch(ENABLE_BATCH_2)
@@ -113,6 +119,7 @@
         assertThat(config.address(), is(IP_ADDRESS_1));
         assertThat(config.port(), is(PORT_1));
         assertThat(config.database(), is(DATABASE_1));
+        assertThat(config.measurement(), is(MEASUREMENT_1));
         assertThat(config.username(), is(USERNAME_1));
         assertThat(config.password(), is(PASSWORD_1));
         assertThat(config.enableBatch(), is(ENABLE_BATCH_1));
diff --git a/apps/openstacktelemetry/pom.xml b/apps/openstacktelemetry/pom.xml
index 0bf8590..4d40139 100644
--- a/apps/openstacktelemetry/pom.xml
+++ b/apps/openstacktelemetry/pom.xml
@@ -31,7 +31,7 @@
     <description>SONA Openstack Telemetry Application</description>
 
     <properties>
-        <influxdb-java.version>2.2</influxdb-java.version>
+        <influxdb-java.version>2.9</influxdb-java.version>
         <kafka-client.version>0.8.2.2_1</kafka-client.version>
         <protobuf-java.version>3.2.0</protobuf-java.version>
         <io-grpc.version>1.3.1</io-grpc.version>