blob: 40528d46720b153d4b8ab07d4ccd557dc06d8a15 [file] [log] [blame]
Jian Li6803ccd2018-06-08 09:26:09 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacktelemetry.impl;
17
Jian Li6803ccd2018-06-08 09:26:09 +090018import org.influxdb.InfluxDB;
19import org.influxdb.InfluxDBFactory;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090020import org.influxdb.dto.BatchPoints;
21import org.influxdb.dto.Point;
22import org.onlab.packet.TpPort;
23import org.onosproject.openstacktelemetry.api.FlowInfo;
Jian Li6803ccd2018-06-08 09:26:09 +090024import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
25import org.onosproject.openstacktelemetry.api.InfluxRecord;
Jian Lid1ce10a2018-06-12 13:47:23 +090026import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
Jian Li6803ccd2018-06-08 09:26:09 +090027import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
28import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070029import org.osgi.service.component.annotations.Activate;
30import org.osgi.service.component.annotations.Component;
31import org.osgi.service.component.annotations.Deactivate;
32import org.osgi.service.component.annotations.Reference;
33import org.osgi.service.component.annotations.ReferenceCardinality;
Jian Li6803ccd2018-06-08 09:26:09 +090034import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090037import java.util.Set;
38
Jian Li6803ccd2018-06-08 09:26:09 +090039/**
40 * InfluxDB telemetry manager.
41 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070042@Component(immediate = true, service = InfluxDbTelemetryAdminService.class)
Jian Li6803ccd2018-06-08 09:26:09 +090043public class InfluxDbTelemetryManager implements InfluxDbTelemetryAdminService {
44
45 private final Logger log = LoggerFactory.getLogger(getClass());
46
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090047 private static final String FLOW_TYPE = "flowType";
48 private static final String DEVICE_ID = "deviceId";
49 private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
50 private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
51
52 private static final String VLAN_ID = "vlanId";
53 private static final String VXLAN_ID = "vxlanId";
54 private static final String SRC_IP = "srcIp";
55 private static final String DST_IP = "dstIp";
56 private static final String SRC_PORT = "srcPort";
57 private static final String DST_PORT = "dstPort";
58 private static final String PROTOCOL = "protocol";
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090059
60 private static final String STARTUP_TIME = "startupTime";
61 private static final String FST_PKT_ARR_TIME = "fstPktArrTime";
62 private static final String LST_PKT_OFFSET = "lstPktOffset";
63 private static final String PREV_ACC_BYTES = "prevAccBytes";
64 private static final String PREV_ACC_PKTS = "prevAccPkts";
65 private static final String CURR_ACC_BYTES = "currAccBytes";
66 private static final String CURR_ACC_PKTS = "currAccPkts";
67 private static final String ERROR_PKTS = "errorPkts";
68 private static final String DROP_PKTS = "dropPkts";
69
Ray Milkeyd84f89b2018-08-17 14:54:17 -070070 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lid1ce10a2018-06-12 13:47:23 +090071 protected OpenstackTelemetryService openstackTelemetryService;
72
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090073 private static final String INFLUX_PROTOCOL = "http";
Jian Li6803ccd2018-06-08 09:26:09 +090074 private InfluxDB producer = null;
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +090075 private String database = null;
76 private String measurement = null;
Jian Li6803ccd2018-06-08 09:26:09 +090077
78 @Activate
79 protected void activate() {
Jian Lid1ce10a2018-06-12 13:47:23 +090080
81 openstackTelemetryService.addTelemetryService(this);
82
Jian Li6803ccd2018-06-08 09:26:09 +090083 log.info("Started");
84 }
85
86 @Deactivate
87 protected void deactivate() {
88 stop();
Jian Lid1ce10a2018-06-12 13:47:23 +090089
90 openstackTelemetryService.removeTelemetryService(this);
91
Jian Li6803ccd2018-06-08 09:26:09 +090092 log.info("Stopped");
93 }
94
95 @Override
96 public void start(TelemetryConfig config) {
97 if (producer != null) {
98 log.info("InfluxDB producer has already been started");
99 return;
100 }
101
102 InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
103
104 StringBuilder influxDbServerBuilder = new StringBuilder();
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900105 influxDbServerBuilder.append(INFLUX_PROTOCOL);
Jian Li6803ccd2018-06-08 09:26:09 +0900106 influxDbServerBuilder.append(":");
107 influxDbServerBuilder.append("//");
108 influxDbServerBuilder.append(influxDbConfig.address());
109 influxDbServerBuilder.append(":");
110 influxDbServerBuilder.append(influxDbConfig.port());
111
112 producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
113 influxDbConfig.username(), influxDbConfig.password());
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900114 database = influxDbConfig.database();
115 measurement = influxDbConfig.measurement();
116
Jian Li6803ccd2018-06-08 09:26:09 +0900117 log.info("InfluxDB producer has Started");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900118
119 createDB();
Jian Li6803ccd2018-06-08 09:26:09 +0900120 }
121
122 @Override
123 public void stop() {
124 if (producer != null) {
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900125 producer.close();
Jian Li6803ccd2018-06-08 09:26:09 +0900126 producer = null;
127 }
128
boyoung21c5f5f42018-09-27 20:29:41 +0900129 log.info("InfluxDB producer has stopped");
Jian Li6803ccd2018-06-08 09:26:09 +0900130 }
131
132 @Override
133 public void restart(TelemetryConfig config) {
134 stop();
135 start(config);
136 }
137
138 @Override
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900139 public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
Jian Lid1ce10a2018-06-12 13:47:23 +0900140 if (producer == null) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900141 log.debug("InfluxDB telemetry service has not been enabled!");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900142 return;
Jian Lid1ce10a2018-06-12 13:47:23 +0900143 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900144
145 if (record.flowInfos().size() == 0) {
Jian Li6c92b3c2018-08-03 11:26:55 +0900146 log.debug("No record to publish");
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900147 return;
148 }
149
150 log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
151
152 BatchPoints batchPoints = BatchPoints.database(database).build();
153
154 for (FlowInfo flowInfo: record.flowInfos()) {
Jian Lif8b8c7f2018-08-27 18:49:04 +0900155 Point.Builder pointBuilder = Point
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900156 .measurement((measurement == null) ? record.measurement() : measurement)
157 .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
158 .tag(DEVICE_ID, flowInfo.deviceId().toString())
159 .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
160 .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900161 .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
162 .tag(SRC_IP, flowInfo.srcIp().toString())
163 .tag(DST_IP, flowInfo.dstIp().toString())
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900164 .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
165 .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900166 .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
167 .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
168 .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
169 .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
170 .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
171 .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
172 .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
173 .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
Jian Lif8b8c7f2018-08-27 18:49:04 +0900174 .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
175
176 if (flowInfo.vlanId() != null) {
177 pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
178 }
179
180 if (flowInfo.srcPort() != null) {
181 pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
182 }
183
184 if (flowInfo.dstPort() != null) {
185 pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
186 }
187
188 batchPoints.point(pointBuilder.build());
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900189 }
190 producer.write(batchPoints);
Jian Lid1ce10a2018-06-12 13:47:23 +0900191 }
192
193 @Override
194 public boolean isRunning() {
195 return producer != null;
Jian Li6803ccd2018-06-08 09:26:09 +0900196 }
Boyoung Jeong4d1c9d12018-07-20 17:09:20 +0900197
198 private void createDB() {
199 if (producer.databaseExists(database)) {
200 log.debug("Database {} is already created", database);
201 } else {
202 producer.createDatabase(database);
203 log.debug("Database {} is created", database);
204 }
205 }
206
207 private String getTpPort(TpPort tpPort) {
208 if (tpPort == null) {
209 return "";
210 }
211 return tpPort.toString();
212 }
Jian Li6803ccd2018-06-08 09:26:09 +0900213}