Publish a statistics record to InfluxDB

Change-Id: I046207ab16b91c5ff65ae6df9e5929b9979907e1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 7e8a102..19acaaa 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -23,6 +23,10 @@
 import org.apache.felix.scr.annotations.Service;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.onlab.packet.TpPort;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
 import org.onosproject.openstacktelemetry.api.InfluxRecord;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
@@ -31,6 +35,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Set;
+
 /**
  * InfluxDB telemetry manager.
  */
@@ -40,11 +46,38 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String FLOW_TYPE = "flowType";
+    private static final String DEVICE_ID = "deviceId";
+    private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
+    private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
+
+    private static final String VLAN_ID = "vlanId";
+    private static final String VXLAN_ID = "vxlanId";
+    private static final String SRC_IP = "srcIp";
+    private static final String DST_IP = "dstIp";
+    private static final String SRC_PORT = "srcPort";
+    private static final String DST_PORT = "dstPort";
+    private static final String PROTOCOL = "protocol";
+    private static final String SRC_MAC = "srcMac";
+    private static final String DST_MAC = "dstMac";
+
+    private static final String STARTUP_TIME = "startupTime";
+    private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
+    private static final String LST_PKT_OFFSET = "lstPktOffset";
+    private static final String PREV_ACC_BYTES = "prevAccBytes";
+    private static final String PREV_ACC_PKTS = "prevAccPkts";
+    private static final String CURR_ACC_BYTES = "currAccBytes";
+    private static final String CURR_ACC_PKTS = "currAccPkts";
+    private static final String ERROR_PKTS = "errorPkts";
+    private static final String DROP_PKTS = "dropPkts";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackTelemetryService openstackTelemetryService;
 
-    private static final String PROTOCOL = "http";
+    private static final String INFLUX_PROTOCOL = "http";
     private InfluxDB producer = null;
+    private String database = null;
+    private String measurement = null;
 
     @Activate
     protected void activate() {
@@ -73,7 +106,7 @@
         InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
 
         StringBuilder influxDbServerBuilder = new StringBuilder();
-        influxDbServerBuilder.append(PROTOCOL);
+        influxDbServerBuilder.append(INFLUX_PROTOCOL);
         influxDbServerBuilder.append(":");
         influxDbServerBuilder.append("//");
         influxDbServerBuilder.append(influxDbConfig.address());
@@ -82,12 +115,18 @@
 
         producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
                 influxDbConfig.username(), influxDbConfig.password());
+        database = influxDbConfig.database();
+        measurement = influxDbConfig.measurement();
+
         log.info("InfluxDB producer has Started");
+
+        createDB();
     }
 
     @Override
     public void stop() {
         if (producer != null) {
+            producer.close();
             producer = null;
         }
 
@@ -101,16 +140,70 @@
     }
 
     @Override
-    public void publish(InfluxRecord<String, Object> record) {
-        // TODO: need to find a way to invoke InfluxDB endpoint using producer
-
+    public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
         if (producer == null) {
             log.warn("InfluxDB telemetry service has not been enabled!");
+            return;
         }
+
+        if (record.flowInfos().size() == 0) {
+            log.warn("No record to publish");
+            return;
+        }
+
+        log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
+
+        BatchPoints batchPoints = BatchPoints.database(database).build();
+
+        for (FlowInfo flowInfo: record.flowInfos()) {
+            Point point = Point
+                    .measurement((measurement == null) ? record.measurement() : measurement)
+                    .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
+                    .tag(DEVICE_ID, flowInfo.deviceId().toString())
+                    .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
+                    .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
+                    .tag(VLAN_ID, flowInfo.vlanId().toString())
+                    .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
+                    .tag(SRC_IP, flowInfo.srcIp().toString())
+                    .tag(DST_IP, flowInfo.dstIp().toString())
+                    .tag(SRC_PORT, getTpPort(flowInfo.srcPort()))
+                    .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
+                    .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
+                    .tag(SRC_MAC, flowInfo.srcMac().toString())
+                    .tag(DST_MAC, flowInfo.dstMac().toString())
+                    .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
+                    .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
+                    .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
+                    .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
+                    .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
+                    .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
+                    .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
+                    .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
+                    .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts())
+                    .build();
+            batchPoints.point(point);
+        }
+        producer.write(batchPoints);
     }
 
     @Override
     public boolean isRunning() {
         return producer != null;
     }
+
+    private void createDB() {
+        if (producer.databaseExists(database)) {
+            log.debug("Database {} is already created", database);
+        } else {
+            producer.createDatabase(database);
+            log.debug("Database {} is created", database);
+        }
+    }
+
+    private String getTpPort(TpPort tpPort) {
+        if (tpPort == null) {
+            return "";
+        }
+        return tpPort.toString();
+    }
 }