Publish a statistics record to InfluxDB
Change-Id: I046207ab16b91c5ff65ae6df9e5929b9979907e1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 7e8a102..19acaaa 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -23,6 +23,10 @@
import org.apache.felix.scr.annotations.Service;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.BatchPoints;
+import org.influxdb.dto.Point;
+import org.onlab.packet.TpPort;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.InfluxRecord;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
@@ -31,6 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
+
/**
* InfluxDB telemetry manager.
*/
@@ -40,11 +46,38 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String FLOW_TYPE = "flowType";
+ private static final String DEVICE_ID = "deviceId";
+ private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
+ private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
+
+ private static final String VLAN_ID = "vlanId";
+ private static final String VXLAN_ID = "vxlanId";
+ private static final String SRC_IP = "srcIp";
+ private static final String DST_IP = "dstIp";
+ private static final String SRC_PORT = "srcPort";
+ private static final String DST_PORT = "dstPort";
+ private static final String PROTOCOL = "protocol";
+ private static final String SRC_MAC = "srcMac";
+ private static final String DST_MAC = "dstMac";
+
+ private static final String STARTUP_TIME = "startupTime";
+ private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
+ private static final String LST_PKT_OFFSET = "lstPktOffset";
+ private static final String PREV_ACC_BYTES = "prevAccBytes";
+ private static final String PREV_ACC_PKTS = "prevAccPkts";
+ private static final String CURR_ACC_BYTES = "currAccBytes";
+ private static final String CURR_ACC_PKTS = "currAccPkts";
+ private static final String ERROR_PKTS = "errorPkts";
+ private static final String DROP_PKTS = "dropPkts";
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackTelemetryService openstackTelemetryService;
- private static final String PROTOCOL = "http";
+ private static final String INFLUX_PROTOCOL = "http";
private InfluxDB producer = null;
+ private String database = null;
+ private String measurement = null;
@Activate
protected void activate() {
@@ -73,7 +106,7 @@
InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
StringBuilder influxDbServerBuilder = new StringBuilder();
- influxDbServerBuilder.append(PROTOCOL);
+ influxDbServerBuilder.append(INFLUX_PROTOCOL);
influxDbServerBuilder.append(":");
influxDbServerBuilder.append("//");
influxDbServerBuilder.append(influxDbConfig.address());
@@ -82,12 +115,18 @@
producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
influxDbConfig.username(), influxDbConfig.password());
+ database = influxDbConfig.database();
+ measurement = influxDbConfig.measurement();
+
log.info("InfluxDB producer has Started");
+
+ createDB();
}
@Override
public void stop() {
if (producer != null) {
+ producer.close();
producer = null;
}
@@ -101,16 +140,70 @@
}
@Override
- public void publish(InfluxRecord<String, Object> record) {
- // TODO: need to find a way to invoke InfluxDB endpoint using producer
-
+ public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
if (producer == null) {
log.warn("InfluxDB telemetry service has not been enabled!");
+ return;
}
+
+ if (record.flowInfos().size() == 0) {
+ log.warn("No record to publish");
+ return;
+ }
+
+ log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
+
+ BatchPoints batchPoints = BatchPoints.database(database).build();
+
+ for (FlowInfo flowInfo: record.flowInfos()) {
+ Point point = Point
+ .measurement((measurement == null) ? record.measurement() : measurement)
+ .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
+ .tag(DEVICE_ID, flowInfo.deviceId().toString())
+ .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
+ .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
+ .tag(VLAN_ID, flowInfo.vlanId().toString())
+ .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
+ .tag(SRC_IP, flowInfo.srcIp().toString())
+ .tag(DST_IP, flowInfo.dstIp().toString())
+ .tag(SRC_PORT, getTpPort(flowInfo.srcPort()))
+ .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
+ .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
+ .tag(SRC_MAC, flowInfo.srcMac().toString())
+ .tag(DST_MAC, flowInfo.dstMac().toString())
+ .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
+ .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
+ .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
+ .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
+ .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
+ .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
+ .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
+ .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
+ .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts())
+ .build();
+ batchPoints.point(point);
+ }
+ producer.write(batchPoints);
}
@Override
public boolean isRunning() {
return producer != null;
}
+
+ private void createDB() {
+ if (producer.databaseExists(database)) {
+ log.debug("Database {} is already created", database);
+ } else {
+ producer.createDatabase(database);
+ log.debug("Database {} is created", database);
+ }
+ }
+
+ private String getTpPort(TpPort tpPort) {
+ if (tpPort == null) {
+ return "";
+ }
+ return tpPort.toString();
+ }
}